@@ -4,7 +4,7 @@ import java.net.InetSocketAddress
4
4
5
5
import scala .collection .mutable .Queue
6
6
7
- import akka .actor .{ Actor , ActorLogging , ActorRef , Deploy , Stash , Terminated , actorRef2Scala }
7
+ import akka .actor ._
8
8
import akka .io .{ BackpressureBuffer , PipelineContext , PipelineStage , Tcp }
9
9
import akka .io .Tcp .{ Command , CommandFailed , Connected , Register }
10
10
import akka .io .TcpPipelineHandler
@@ -18,27 +18,30 @@ import play.api.libs.iteratee._
18
18
import akka .pattern .pipe
19
19
import scala .concurrent .Promise
20
20
21
+ import scala .util .{ Success , Failure }
22
+
23
+ object SentinelClientWorker {
24
+ case class ConnectToHost (address : InetSocketAddress )
25
+ case object TcpActorDisconnected
26
+ case object UpstreamFinished
27
+ }
28
+
21
29
class SentinelClientWorker [Cmd , Evt ](address : InetSocketAddress , stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
22
30
workerDescription : String = " Sentinel Client Worker" )(lowBytes : Long , highBytes : Long , maxBufferSize : Long ) extends Actor with ActorLogging with Stash {
23
31
import context .dispatcher
24
32
import SentinelClientWorker ._
25
33
26
34
val tcp = akka.io.IO (Tcp )(context.system)
35
+ val receiverQueue = Queue [ActorRef ]()
27
36
28
- /* Current open requests */
29
- val promises = Queue [Promise [Evt ]]()
30
-
31
- override def preStart = {
32
- tcp ! Tcp .Connect (address)
33
- }
37
+ override def preStart = tcp ! Tcp .Connect (address)
34
38
35
- def receive = {
39
+ def disconnected : Receive = {
36
40
case Connected (remoteAddr, localAddr) ⇒
37
- val init =
38
- TcpPipelineHandler .withLogger(log,
39
- stages >>
40
- new TcpReadWriteAdapter >>
41
- new BackpressureBuffer (lowBytes, highBytes, maxBufferSize))
41
+ val init = TcpPipelineHandler .withLogger(log,
42
+ stages >>
43
+ new TcpReadWriteAdapter >>
44
+ new BackpressureBuffer (lowBytes, highBytes, maxBufferSize))
42
45
43
46
val handler = context.actorOf(TcpPipelineHandler .props(init, sender, self).withDeploy(Deploy .local).withDeploy(Deploy .local))
44
47
context watch handler
@@ -53,36 +56,111 @@ class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: ⇒ Pip
53
56
case _ ⇒ stash()
54
57
}
55
58
56
- def handleResponses (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = {
59
+ def connected (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = {
60
+ val operationHandler = context.actorOf(Props (new OperationHandler (init, connection)))
61
+ val upstreamHandler = context.actorOf(Props (new UpstreamHandler (init, connection)).withDispatcher(" nl.gideondk.sentinel.sentinel-dispatcher" ))
62
+
63
+ context watch operationHandler
64
+ context watch upstreamHandler
65
+
66
+ def handleResponses : Receive = {
67
+ case x : init.Event ⇒
68
+ receiverQueue.dequeue.forward(x)
69
+ }
70
+
71
+ def handleHighWaterMark : Receive = {
72
+ case BackpressureBuffer .HighWatermarkReached ⇒
73
+ upstreamHandler ! BackpressureBuffer .HighWatermarkReached
74
+ context.become(handleResponses orElse {
75
+ case BackpressureBuffer .LowWatermarkReached ⇒
76
+ upstreamHandler ! BackpressureBuffer .LowWatermarkReached
77
+ unstashAll()
78
+ context.unbecome()
79
+ case _ : SentinelCommand ⇒ stash()
80
+ }, discardOld = false )
81
+ }
82
+
83
+ /* Upstream handler, stashes new requests until up stream is finished */
84
+ def handleUpstream : Receive = handleResponses orElse handleHighWaterMark orElse {
85
+ case UpstreamFinished ⇒
86
+ unstashAll()
87
+ context.unbecome()
88
+ case _ ⇒ stash()
89
+ }
90
+
91
+ def default : Receive = handleResponses orElse handleHighWaterMark orElse {
92
+ case o : Operation [Cmd , Evt ] ⇒
93
+ receiverQueue enqueue operationHandler
94
+ operationHandler forward o
95
+
96
+ case so : StreamedOperation [Cmd , Evt ] ⇒
97
+ context.become(handleUpstream, discardOld = false )
98
+ receiverQueue enqueue upstreamHandler
99
+ upstreamHandler forward so
100
+
101
+ case Terminated (`connection`) ⇒
102
+ log.error(workerDescription + " has been terminated due to a terminated TCP worker" )
103
+ context.stop(self)
104
+
105
+ case x : Terminated ⇒
106
+ log.error(workerDescription + " has been terminated due to a internal error" )
107
+ context.stop(self)
108
+ }
109
+
110
+ default
111
+ }
112
+
113
+ def receive = disconnected
114
+ }
115
+
116
+ private class OperationHandler [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ) extends Actor with ActorLogging {
117
+ import SentinelClientWorker ._
118
+
119
+ context watch connection
120
+
121
+ val promises = Queue [Promise [Evt ]]()
122
+
123
+ override def postStop = {
124
+ promises.foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
125
+ }
126
+
127
+ def receive : Receive = {
57
128
case init.Event (data) ⇒
58
129
val pr = promises.dequeue
59
130
pr.success(data)
60
131
61
- case Terminated (`connection`) ⇒
62
- promises.foreach(_.failure(new Exception (" TCP Actor disconnected" )))
63
- context.stop(self)
64
- }
65
-
66
- def connected (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = handleResponses(init, connection) orElse {
67
132
case o : Operation [Cmd , Evt ] ⇒
68
133
promises.enqueue(o.promise)
69
134
connection ! init.Command (o.command)
135
+ }
136
+ }
70
137
138
+ private class UpstreamHandler [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ) extends Actor with ActorLogging with Stash {
139
+ import SentinelClientWorker ._
140
+ import context .dispatcher
141
+
142
+ context watch connection
143
+
144
+ val promises = Queue [Promise [Evt ]]()
145
+
146
+ override def postStop = {
147
+ promises.foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
148
+ }
149
+
150
+ def handleResponses : Receive = {
151
+ case init.Event (data) ⇒
152
+ val pr = promises.dequeue
153
+ pr.success(data)
154
+ }
155
+
156
+ def receive : Receive = handleResponses orElse {
71
157
case o : StreamedOperation [Cmd , Evt ] ⇒
72
158
promises.enqueue(o.promise)
73
- context.become(handleOutgoingStream(init, connection, o.stream), discardOld = false )
74
-
75
- case BackpressureBuffer .HighWatermarkReached ⇒
76
- context.become(handleResponses(init, connection) orElse {
77
- case BackpressureBuffer .LowWatermarkReached ⇒
78
- unstashAll()
79
- context.unbecome()
80
- case _ : SentinelCommand ⇒ stash()
81
- }, discardOld = false )
159
+ context.become(handleOutgoingStream(o.stream), discardOld = false )
82
160
}
83
161
84
- def handleOutgoingStream (init : Init [ WithinActorContext , Cmd , Evt ], connection : ActorRef , stream : Enumerator [Cmd ]): Receive = {
85
- case class StreamFinished ()
162
+ def handleOutgoingStream (stream : Enumerator [Cmd ]): Receive = {
163
+ case object StreamFinished
86
164
case class StreamChunk (c : Cmd )
87
165
88
166
def iteratee : Iteratee [Cmd , Unit ] = {
@@ -97,23 +175,30 @@ class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: ⇒ Pip
97
175
(Cont [Cmd , Unit ](i ⇒ step(i)))
98
176
}
99
177
100
- (stream |>>> iteratee).map(x ⇒ StreamFinished ()).pipeTo(self)
178
+ (stream |>>> iteratee) onComplete {
179
+ case Success (result) ⇒ self ! StreamFinished
180
+ case Failure (failure) ⇒ self ! failure
181
+ }
101
182
102
- handleResponses(init, connection) orElse {
183
+ handleResponses orElse {
103
184
case StreamChunk (x) ⇒
104
185
connection ! init.Command (x)
105
- case x : StreamFinished ⇒
186
+
187
+ case StreamFinished ⇒
188
+ context.parent ! UpstreamFinished
106
189
unstashAll()
107
190
context.unbecome()
191
+
108
192
case scala.util.Failure (e : Throwable ) ⇒
109
193
log.error(e.getMessage)
110
194
context.stop(self)
195
+
111
196
case BackpressureBuffer .HighWatermarkReached ⇒
112
- context.become(handleResponses(init, connection) orElse {
197
+ context.become(handleResponses orElse {
113
198
case BackpressureBuffer .LowWatermarkReached ⇒
114
199
unstashAll()
115
200
context.unbecome()
116
- case _ : SentinelCommand | _ : StreamChunk ⇒ stash()
201
+ case _ : SentinelCommand | _ : StreamChunk | StreamFinished ⇒ stash()
117
202
}, discardOld = false )
118
203
case _ : SentinelCommand ⇒ stash()
119
204
}
@@ -128,41 +213,29 @@ class WaitingSentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages:
128
213
var requestRunning = false
129
214
val requests = Queue [SentinelCommand ]()
130
215
131
- override def handleResponses (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = {
132
- case init.Event (data) ⇒
133
- val pr = promises.dequeue
134
- pr.success(data)
135
- requestRunning = false
136
- if (requests.length > 0 ) connected(init, connection)(requests.dequeue)
137
-
138
- case Terminated (`connection`) ⇒
139
- promises.foreach(_.failure(new Exception (" TCP Actor disconnected" )))
140
- context.stop(self)
141
- }
142
-
143
- override def connected (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = handleResponses(init, connection) orElse {
144
- case o : Operation [Cmd , Evt ] ⇒
145
- requestRunning match {
146
- case false ⇒
147
- super .connected(init, connection)(o)
148
- requestRunning = true
149
- case true ⇒ requests.enqueue(o)
150
- }
151
-
152
- case o : StreamedOperation [Cmd , Evt ] ⇒
153
- requestRunning match {
154
- case false ⇒
155
- super .connected(init, connection)(o)
156
- requestRunning = true
157
- case true ⇒ requests.enqueue(o)
158
- }
159
-
160
- case BackpressureBuffer .HighWatermarkReached ⇒
161
- super .connected(init, connection)(BackpressureBuffer .HighWatermarkReached )
216
+ override def connected (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = {
217
+ val r : Receive = {
218
+ case x : init.Event ⇒
219
+ receiverQueue.dequeue.forward(x)
220
+ requestRunning = false
221
+ if (requests.length > 0 ) connected(init, connection)(requests.dequeue)
222
+
223
+ case o : Operation [Cmd , Evt ] ⇒
224
+ requestRunning match {
225
+ case false ⇒
226
+ super .connected(init, connection)(o)
227
+ requestRunning = true
228
+ case true ⇒ requests.enqueue(o)
229
+ }
230
+
231
+ case o : StreamedOperation [Cmd , Evt ] ⇒
232
+ requestRunning match {
233
+ case false ⇒
234
+ super .connected(init, connection)(o)
235
+ requestRunning = true
236
+ case true ⇒ requests.enqueue(o)
237
+ }
238
+ }
239
+ r orElse super .connected(init, connection)
162
240
}
163
241
}
164
-
165
- private [sentinel] object SentinelClientWorker {
166
- /* Worker commands */
167
- case class ConnectToHost (address : InetSocketAddress )
168
- }
0 commit comments