Skip to content

Commit 62b19a5

Browse files
committed
Rewrite worker structure for more flexibility
1 parent 5b468e9 commit 62b19a5

File tree

5 files changed

+150
-77
lines changed

5 files changed

+150
-77
lines changed

project/Build.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ object ApplicationBuild extends Build {
88
override lazy val settings = super.settings ++
99
Seq(
1010
name := "sentinel",
11-
version := "0.5.6",
11+
version := "0.6.0",
1212
organization := "nl.gideondk",
1313
scalaVersion := "2.10.2",
1414
parallelExecution in Test := false,

src/main/scala/nl/gideondk/sentinel/client/Worker.scala

+145-72
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.net.InetSocketAddress
44

55
import scala.collection.mutable.Queue
66

7-
import akka.actor.{ Actor, ActorLogging, ActorRef, Deploy, Stash, Terminated, actorRef2Scala }
7+
import akka.actor._
88
import akka.io.{ BackpressureBuffer, PipelineContext, PipelineStage, Tcp }
99
import akka.io.Tcp.{ Command, CommandFailed, Connected, Register }
1010
import akka.io.TcpPipelineHandler
@@ -18,27 +18,30 @@ import play.api.libs.iteratee._
1818
import akka.pattern.pipe
1919
import scala.concurrent.Promise
2020

21+
import scala.util.{ Success, Failure }
22+
23+
object SentinelClientWorker {
24+
case class ConnectToHost(address: InetSocketAddress)
25+
case object TcpActorDisconnected
26+
case object UpstreamFinished
27+
}
28+
2129
class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
2230
workerDescription: String = "Sentinel Client Worker")(lowBytes: Long, highBytes: Long, maxBufferSize: Long) extends Actor with ActorLogging with Stash {
2331
import context.dispatcher
2432
import SentinelClientWorker._
2533

2634
val tcp = akka.io.IO(Tcp)(context.system)
35+
val receiverQueue = Queue[ActorRef]()
2736

28-
/* Current open requests */
29-
val promises = Queue[Promise[Evt]]()
30-
31-
override def preStart = {
32-
tcp ! Tcp.Connect(address)
33-
}
37+
override def preStart = tcp ! Tcp.Connect(address)
3438

35-
def receive = {
39+
def disconnected: Receive = {
3640
case Connected(remoteAddr, localAddr)
37-
val init =
38-
TcpPipelineHandler.withLogger(log,
39-
stages >>
40-
new TcpReadWriteAdapter >>
41-
new BackpressureBuffer(lowBytes, highBytes, maxBufferSize))
41+
val init = TcpPipelineHandler.withLogger(log,
42+
stages >>
43+
new TcpReadWriteAdapter >>
44+
new BackpressureBuffer(lowBytes, highBytes, maxBufferSize))
4245

4346
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local).withDeploy(Deploy.local))
4447
context watch handler
@@ -53,36 +56,111 @@ class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: ⇒ Pip
5356
case _ stash()
5457
}
5558

56-
def handleResponses(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = {
59+
def connected(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = {
60+
val operationHandler = context.actorOf(Props(new OperationHandler(init, connection)))
61+
val upstreamHandler = context.actorOf(Props(new UpstreamHandler(init, connection)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
62+
63+
context watch operationHandler
64+
context watch upstreamHandler
65+
66+
def handleResponses: Receive = {
67+
case x: init.Event
68+
receiverQueue.dequeue.forward(x)
69+
}
70+
71+
def handleHighWaterMark: Receive = {
72+
case BackpressureBuffer.HighWatermarkReached
73+
upstreamHandler ! BackpressureBuffer.HighWatermarkReached
74+
context.become(handleResponses orElse {
75+
case BackpressureBuffer.LowWatermarkReached
76+
upstreamHandler ! BackpressureBuffer.LowWatermarkReached
77+
unstashAll()
78+
context.unbecome()
79+
case _: SentinelCommand stash()
80+
}, discardOld = false)
81+
}
82+
83+
/* Upstream handler, stashes new requests until up stream is finished */
84+
def handleUpstream: Receive = handleResponses orElse handleHighWaterMark orElse {
85+
case UpstreamFinished
86+
unstashAll()
87+
context.unbecome()
88+
case _ stash()
89+
}
90+
91+
def default: Receive = handleResponses orElse handleHighWaterMark orElse {
92+
case o: Operation[Cmd, Evt]
93+
receiverQueue enqueue operationHandler
94+
operationHandler forward o
95+
96+
case so: StreamedOperation[Cmd, Evt]
97+
context.become(handleUpstream, discardOld = false)
98+
receiverQueue enqueue upstreamHandler
99+
upstreamHandler forward so
100+
101+
case Terminated(`connection`)
102+
log.error(workerDescription + " has been terminated due to a terminated TCP worker")
103+
context.stop(self)
104+
105+
case x: Terminated
106+
log.error(workerDescription + " has been terminated due to a internal error")
107+
context.stop(self)
108+
}
109+
110+
default
111+
}
112+
113+
def receive = disconnected
114+
}
115+
116+
private class OperationHandler[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef) extends Actor with ActorLogging {
117+
import SentinelClientWorker._
118+
119+
context watch connection
120+
121+
val promises = Queue[Promise[Evt]]()
122+
123+
override def postStop = {
124+
promises.foreach(_.failure(new Exception("Actor quit unexpectedly")))
125+
}
126+
127+
def receive: Receive = {
57128
case init.Event(data)
58129
val pr = promises.dequeue
59130
pr.success(data)
60131

61-
case Terminated(`connection`)
62-
promises.foreach(_.failure(new Exception("TCP Actor disconnected")))
63-
context.stop(self)
64-
}
65-
66-
def connected(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = handleResponses(init, connection) orElse {
67132
case o: Operation[Cmd, Evt]
68133
promises.enqueue(o.promise)
69134
connection ! init.Command(o.command)
135+
}
136+
}
70137

138+
private class UpstreamHandler[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef) extends Actor with ActorLogging with Stash {
139+
import SentinelClientWorker._
140+
import context.dispatcher
141+
142+
context watch connection
143+
144+
val promises = Queue[Promise[Evt]]()
145+
146+
override def postStop = {
147+
promises.foreach(_.failure(new Exception("Actor quit unexpectedly")))
148+
}
149+
150+
def handleResponses: Receive = {
151+
case init.Event(data)
152+
val pr = promises.dequeue
153+
pr.success(data)
154+
}
155+
156+
def receive: Receive = handleResponses orElse {
71157
case o: StreamedOperation[Cmd, Evt]
72158
promises.enqueue(o.promise)
73-
context.become(handleOutgoingStream(init, connection, o.stream), discardOld = false)
74-
75-
case BackpressureBuffer.HighWatermarkReached
76-
context.become(handleResponses(init, connection) orElse {
77-
case BackpressureBuffer.LowWatermarkReached
78-
unstashAll()
79-
context.unbecome()
80-
case _: SentinelCommand stash()
81-
}, discardOld = false)
159+
context.become(handleOutgoingStream(o.stream), discardOld = false)
82160
}
83161

84-
def handleOutgoingStream(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef, stream: Enumerator[Cmd]): Receive = {
85-
case class StreamFinished()
162+
def handleOutgoingStream(stream: Enumerator[Cmd]): Receive = {
163+
case object StreamFinished
86164
case class StreamChunk(c: Cmd)
87165

88166
def iteratee: Iteratee[Cmd, Unit] = {
@@ -97,23 +175,30 @@ class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: ⇒ Pip
97175
(Cont[Cmd, Unit](i step(i)))
98176
}
99177

100-
(stream |>>> iteratee).map(x StreamFinished()).pipeTo(self)
178+
(stream |>>> iteratee) onComplete {
179+
case Success(result) self ! StreamFinished
180+
case Failure(failure) self ! failure
181+
}
101182

102-
handleResponses(init, connection) orElse {
183+
handleResponses orElse {
103184
case StreamChunk(x)
104185
connection ! init.Command(x)
105-
case x: StreamFinished
186+
187+
case StreamFinished
188+
context.parent ! UpstreamFinished
106189
unstashAll()
107190
context.unbecome()
191+
108192
case scala.util.Failure(e: Throwable)
109193
log.error(e.getMessage)
110194
context.stop(self)
195+
111196
case BackpressureBuffer.HighWatermarkReached
112-
context.become(handleResponses(init, connection) orElse {
197+
context.become(handleResponses orElse {
113198
case BackpressureBuffer.LowWatermarkReached
114199
unstashAll()
115200
context.unbecome()
116-
case _: SentinelCommand | _: StreamChunk stash()
201+
case _: SentinelCommand | _: StreamChunk | StreamFinished stash()
117202
}, discardOld = false)
118203
case _: SentinelCommand stash()
119204
}
@@ -128,41 +213,29 @@ class WaitingSentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages:
128213
var requestRunning = false
129214
val requests = Queue[SentinelCommand]()
130215

131-
override def handleResponses(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = {
132-
case init.Event(data)
133-
val pr = promises.dequeue
134-
pr.success(data)
135-
requestRunning = false
136-
if (requests.length > 0) connected(init, connection)(requests.dequeue)
137-
138-
case Terminated(`connection`)
139-
promises.foreach(_.failure(new Exception("TCP Actor disconnected")))
140-
context.stop(self)
141-
}
142-
143-
override def connected(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = handleResponses(init, connection) orElse {
144-
case o: Operation[Cmd, Evt]
145-
requestRunning match {
146-
case false
147-
super.connected(init, connection)(o)
148-
requestRunning = true
149-
case true requests.enqueue(o)
150-
}
151-
152-
case o: StreamedOperation[Cmd, Evt]
153-
requestRunning match {
154-
case false
155-
super.connected(init, connection)(o)
156-
requestRunning = true
157-
case true requests.enqueue(o)
158-
}
159-
160-
case BackpressureBuffer.HighWatermarkReached
161-
super.connected(init, connection)(BackpressureBuffer.HighWatermarkReached)
216+
override def connected(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = {
217+
val r: Receive = {
218+
case x: init.Event
219+
receiverQueue.dequeue.forward(x)
220+
requestRunning = false
221+
if (requests.length > 0) connected(init, connection)(requests.dequeue)
222+
223+
case o: Operation[Cmd, Evt]
224+
requestRunning match {
225+
case false
226+
super.connected(init, connection)(o)
227+
requestRunning = true
228+
case true requests.enqueue(o)
229+
}
230+
231+
case o: StreamedOperation[Cmd, Evt]
232+
requestRunning match {
233+
case false
234+
super.connected(init, connection)(o)
235+
requestRunning = true
236+
case true requests.enqueue(o)
237+
}
238+
}
239+
r orElse super.connected(init, connection)
162240
}
163241
}
164-
165-
private[sentinel] object SentinelClientWorker {
166-
/* Worker commands */
167-
case class ConnectToHost(address: InetSocketAddress)
168-
}

src/main/scala/nl/gideondk/sentinel/server/Handlers.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,4 @@ class SentinelServerBasicAsyncHandler[Cmd, Evt](init: Init[WithinActorContext, C
7575
}
7676

7777
def receive = handleRequestAndResponse
78-
}
78+
}

src/test/scala/nl/gideondk/sentinel/PingPongSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ trait PingPongWorkers {
5050
val pingServer = SentinelServer.async(8000, PingPongServerHandler.handle, "Ping Server")(stages)(serverSystem)
5151

5252
val clientSystem = ActorSystem("ping-client-system")
53-
val pingClient = SentinelClient.dynamic("localhost", 8000, 8, 32, "Ping Client")(stages)(clientSystem)
53+
val pingClient = SentinelClient.randomRouting("localhost", 8000, 16, "Ping Client")(stages)(clientSystem)
5454
}
5555

5656
class PingPongSpec extends Specification with PingPongWorkers {
@@ -74,7 +74,7 @@ class PingPongSpec extends Specification with PingPongWorkers {
7474
val num = 20000
7575

7676
val mulActs = for (i 1 to num) yield (pingClient <~< PingPongMessageFormat("PING"))
77-
val tasks = Task.sequenceSuccesses(mulActs.toList)
77+
val tasks = Task.sequence(mulActs.toList)
7878

7979
val fut = tasks.start
8080

src/test/scala/nl/gideondk/sentinel/SequenceSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ trait SequenceWorkers {
5757
implicit val actorSystem = ActorSystem("test-system")
5858

5959
val server = SentinelServer.async(8001, SequenceServerHandler.handle, "Ping Server")(stages)
60-
val client = SentinelClient("localhost", 8001, RandomRouter(4), "Ping Client")(stages)
60+
val client = SentinelClient.waiting("localhost", 8001, RandomRouter(4), "Ping Client")(stages)
6161
}
6262

6363
class SequenceSpec extends Specification with SequenceWorkers {

0 commit comments

Comments
 (0)