@@ -5,23 +5,47 @@ import java.net.InetSocketAddress
5
5
import akka .actor ._
6
6
import akka .io ._
7
7
import akka .io .Tcp ._
8
- import akka .util .ByteString
8
+ import akka .util .{ Timeout , ByteString }
9
9
10
- import nl .gideondk .sentinel ._
11
- import nl .gideondk .sentinel .SentinelResolver
12
10
import scala .concurrent .{ Future , Promise , ExecutionContext }
13
- import play .api .libs .iteratee .Enumerator
11
+ import scala .util .Random
12
+
13
+ import akka .pattern .ask
14
14
15
15
trait Server [Cmd , Evt ] {
16
16
def actor : ActorRef
17
17
18
- def ?* (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = askMany(command)
18
+ def ?** (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = askAll(command)
19
+
20
+ def ?* (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = askAllHosts(command)
19
21
20
- def askMany (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = Task {
22
+ def ? (command : Cmd )(implicit context : ExecutionContext ): Task [Evt ] = askAny(command)
23
+
24
+ def askAll (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = Task {
21
25
val promise = Promise [List [Evt ]]()
22
26
actor ! ServerCommand .AskAll (command, promise)
23
27
promise.future
24
28
}
29
+
30
+ def askAllHosts (command : Cmd )(implicit context : ExecutionContext ): Task [List [Evt ]] = Task {
31
+ val promise = Promise [List [Evt ]]()
32
+ actor ! ServerCommand .AskAllHosts (command, promise)
33
+ promise.future
34
+ }
35
+
36
+ def askAny (command : Cmd )(implicit context : ExecutionContext ): Task [Evt ] = Task {
37
+ val promise = Promise [Evt ]()
38
+ actor ! ServerCommand .AskAny (command, promise)
39
+ promise.future
40
+ }
41
+
42
+ def connectedSockets (implicit timeout : Timeout ): Task [Int ] = Task {
43
+ (actor ? ServerMetric .ConnectedSockets ).mapTo[Int ]
44
+ }
45
+
46
+ def connectedHosts (implicit timeout : Timeout ): Task [Int ] = Task {
47
+ (actor ? ServerMetric .ConnectedHosts ).mapTo[Int ]
48
+ }
25
49
}
26
50
27
51
class ServerCore [Cmd , Evt ](port : Int , description : String , stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
@@ -36,22 +60,52 @@ class ServerCore[Cmd, Evt](port: Int, description: String, stages: ⇒ PipelineS
36
60
val tcp = akka.io.IO (Tcp )(context.system)
37
61
val address = new InetSocketAddress (port)
38
62
39
- var connections = List [ActorRef ]()
63
+ var connections = Map [ String , List [ActorRef ] ]()
40
64
41
65
override def preStart = {
42
66
tcp ! Bind (self, address)
43
67
}
44
68
45
69
def receiveCommands : Receive = {
46
- case x : ServerCommand .AskAll [Cmd , Evt ] ⇒
47
- val futures = Task .sequence(connections.map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
70
+ case x : ServerCommand .AskAll [Cmd , Evt ] if connections.values.toList.length > 0 ⇒
71
+ val futures = Task .sequence(connections.values.toList.flatten. map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
48
72
case scala.util.Success (s) ⇒ Future .successful(s)
49
73
case scala.util.Failure (e) ⇒ Future .failed(e)
50
74
}
51
75
x.promise.completeWith(futures)
76
+
77
+ case x : ServerCommand .AskAllHosts [Cmd , Evt ] if connections.values.toList.length > 0 ⇒
78
+ val futures = Task .sequence(connections.values.toList.map(x ⇒ Random .shuffle(x.toList).head).map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
79
+ case scala.util.Success (s) ⇒ Future .successful(s)
80
+ case scala.util.Failure (e) ⇒ Future .failed(e)
81
+ }
82
+ x.promise.completeWith(futures)
83
+
84
+ case x : ServerCommand .AskAny [Cmd , Evt ] if connections.values.toList.length > 0 ⇒
85
+ val future = (wrapAtenna(Random .shuffle(connections.values.toList.flatten).head) ? x.payload).start.flatMap {
86
+ case scala.util.Success (s) ⇒ Future .successful(s)
87
+ case scala.util.Failure (e) ⇒ Future .failed(e)
88
+ }
89
+ x.promise.completeWith(future)
90
+
91
+ case ServerMetric .ConnectedSockets ⇒
92
+ sender ! connections.values.flatten.toList.length
93
+
94
+ case ServerMetric .ConnectedHosts ⇒
95
+ sender ! connections.keys.toList.length
52
96
}
53
97
54
98
def receive = receiveCommands orElse {
99
+ case x : Terminated ⇒
100
+ val antenna = x.getActor
101
+ connections = connections.foldLeft(Map [String , List [ActorRef ]]()) {
102
+ case (c, i) ⇒
103
+ i._2.contains(antenna) match {
104
+ case true ⇒ if (i._2.length == 1 ) c else c + (i._1 -> i._2.filter(_ != antenna))
105
+ case false ⇒ c + i
106
+ }
107
+ }
108
+
55
109
case Bound ⇒
56
110
log.debug(description + " bound to " + address)
57
111
@@ -71,7 +125,10 @@ class ServerCore[Cmd, Evt](port: Int, description: String, stages: ⇒ PipelineS
71
125
val connection = sender
72
126
73
127
val antenna = context.actorOf(Props (new Antenna (init, resolver)).withDispatcher(" nl.gideondk.sentinel.sentinel-dispatcher" ))
74
- connections :+= antenna
128
+ context.watch(antenna)
129
+
130
+ val currentAtennas = connections.get(remoteAddr.getHostName).getOrElse(List [ActorRef ]())
131
+ connections = connections + (remoteAddr.getHostName -> (currentAtennas ++ List (antenna)))
75
132
76
133
val tcpHandler = context.actorOf(TcpPipelineHandler .props(init, connection, antenna).withDeploy(Deploy .local))
77
134
0 commit comments