@@ -15,9 +15,10 @@ import nl.gideondk.sentinel._
15
15
16
16
import play .api .libs .iteratee ._
17
17
18
+ import akka .pattern .pipe
18
19
import scala .concurrent .Promise
19
20
20
- class SentinelClientWorker [Cmd , Evt ](stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
21
+ class SentinelClientWorker [Cmd , Evt ](address : InetSocketAddress , stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
21
22
workerDescription : String = " Sentinel Client Worker" )(lowBytes : Long , highBytes : Long , maxBufferSize : Long ) extends Actor with ActorLogging with Stash {
22
23
import context .dispatcher
23
24
import SentinelClientWorker ._
@@ -27,10 +28,11 @@ class SentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineContext,
27
28
/* Current open requests */
28
29
val promises = Queue [Promise [Evt ]]()
29
30
30
- def receive = {
31
- case h : ConnectToHost ⇒
32
- tcp ! Tcp . Connect (h.address)
31
+ override def preStart = {
32
+ tcp ! Tcp . Connect (address)
33
+ }
33
34
35
+ def receive = {
34
36
case Connected (remoteAddr, localAddr) ⇒
35
37
val init =
36
38
TcpPipelineHandler .withLogger(log,
@@ -68,22 +70,58 @@ class SentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineContext,
68
70
69
71
case o : StreamedOperation [Cmd , Evt ] ⇒
70
72
promises.enqueue(o.promise)
71
- o.stream |>>> Iteratee .foreach(x ⇒ connection ! init. Command (x) )
73
+ context.become(handleOutgoingStream(init, connection, o.stream), discardOld = false )
72
74
73
75
case BackpressureBuffer .HighWatermarkReached ⇒
74
- context.become(highWaterMark(init, connection))
76
+ context.become(handleResponses(init, connection) orElse {
77
+ case BackpressureBuffer .LowWatermarkReached ⇒
78
+ unstashAll()
79
+ context.unbecome()
80
+ case _ : SentinelCommand ⇒ stash()
81
+ }, discardOld = false )
75
82
}
76
83
77
- def highWaterMark (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef ): Receive = handleResponses(init, connection) orElse {
78
- case BackpressureBuffer .LowWatermarkReached ⇒
79
- unstashAll()
80
- context.become(connected(init, connection))
81
- case _ : Operation [Cmd , Evt ] ⇒ stash()
84
+ def handleOutgoingStream (init : Init [WithinActorContext , Cmd , Evt ], connection : ActorRef , stream : Enumerator [Cmd ]): Receive = {
85
+ case class StreamFinished ()
86
+ case class StreamChunk (c : Cmd )
87
+
88
+ def iteratee : Iteratee [Cmd , Unit ] = {
89
+ def step (i : Input [Cmd ]): Iteratee [Cmd , Unit ] = i match {
90
+ case Input .EOF ⇒
91
+ Done (Unit , Input .EOF )
92
+ case Input .Empty ⇒ Cont [Cmd , Unit ](i ⇒ step(i))
93
+ case Input .El (e) ⇒
94
+ self ! StreamChunk (e)
95
+ Cont [Cmd , Unit ](i ⇒ step(i))
96
+ }
97
+ (Cont [Cmd , Unit ](i ⇒ step(i)))
98
+ }
99
+
100
+ (stream |>>> iteratee).map(x ⇒ StreamFinished ()).pipeTo(self)
101
+
102
+ handleResponses(init, connection) orElse {
103
+ case StreamChunk (x) ⇒
104
+ connection ! init.Command (x)
105
+ case x : StreamFinished ⇒
106
+ unstashAll()
107
+ context.unbecome()
108
+ case scala.util.Failure (e : Throwable ) ⇒
109
+ log.error(e.getMessage)
110
+ context.stop(self)
111
+ case BackpressureBuffer .HighWatermarkReached ⇒
112
+ context.become(handleResponses(init, connection) orElse {
113
+ case BackpressureBuffer .LowWatermarkReached ⇒
114
+ unstashAll()
115
+ context.unbecome()
116
+ case _ : SentinelCommand | _ : StreamChunk ⇒ stash()
117
+ }, discardOld = false )
118
+ case _ : SentinelCommand ⇒ stash()
119
+ }
82
120
}
83
121
}
84
122
85
- class WaitingSentinelClientWorker [Cmd , Evt ](stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
86
- workerDescription : String = " Sentinel Client Worker" )(lowBytes : Long , highBytes : Long , maxBufferSize : Long ) extends SentinelClientWorker (stages, workerDescription)(lowBytes, highBytes, maxBufferSize) {
123
+ class WaitingSentinelClientWorker [Cmd , Evt ](address : InetSocketAddress , stages : ⇒ PipelineStage [PipelineContext , Cmd , ByteString , Evt , ByteString ],
124
+ workerDescription : String = " Sentinel Client Worker" )(lowBytes : Long , highBytes : Long , maxBufferSize : Long ) extends SentinelClientWorker (address, stages, workerDescription)(lowBytes, highBytes, maxBufferSize) {
87
125
88
126
import context .dispatcher
89
127
@@ -120,7 +158,7 @@ class WaitingSentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineCo
120
158
}
121
159
122
160
case BackpressureBuffer .HighWatermarkReached ⇒
123
- context.become(highWaterMark( init, connection))
161
+ super .connected( init, connection)( BackpressureBuffer . HighWatermarkReached )
124
162
}
125
163
}
126
164
0 commit comments