Skip to content

Commit 5b468e9

Browse files
committed
Create more robust Enumerator "feeder" for clients
1 parent 1e254cf commit 5b468e9

File tree

2 files changed

+54
-16
lines changed

2 files changed

+54
-16
lines changed

src/main/scala/nl/gideondk/sentinel/client/Worker.scala

+52-14
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import nl.gideondk.sentinel._
1515

1616
import play.api.libs.iteratee._
1717

18+
import akka.pattern.pipe
1819
import scala.concurrent.Promise
1920

20-
class SentinelClientWorker[Cmd, Evt](stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
21+
class SentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
2122
workerDescription: String = "Sentinel Client Worker")(lowBytes: Long, highBytes: Long, maxBufferSize: Long) extends Actor with ActorLogging with Stash {
2223
import context.dispatcher
2324
import SentinelClientWorker._
@@ -27,10 +28,11 @@ class SentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineContext,
2728
/* Current open requests */
2829
val promises = Queue[Promise[Evt]]()
2930

30-
def receive = {
31-
case h: ConnectToHost
32-
tcp ! Tcp.Connect(h.address)
31+
override def preStart = {
32+
tcp ! Tcp.Connect(address)
33+
}
3334

35+
def receive = {
3436
case Connected(remoteAddr, localAddr)
3537
val init =
3638
TcpPipelineHandler.withLogger(log,
@@ -68,22 +70,58 @@ class SentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineContext,
6870

6971
case o: StreamedOperation[Cmd, Evt]
7072
promises.enqueue(o.promise)
71-
o.stream |>>> Iteratee.foreach(x connection ! init.Command(x))
73+
context.become(handleOutgoingStream(init, connection, o.stream), discardOld = false)
7274

7375
case BackpressureBuffer.HighWatermarkReached
74-
context.become(highWaterMark(init, connection))
76+
context.become(handleResponses(init, connection) orElse {
77+
case BackpressureBuffer.LowWatermarkReached
78+
unstashAll()
79+
context.unbecome()
80+
case _: SentinelCommand stash()
81+
}, discardOld = false)
7582
}
7683

77-
def highWaterMark(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef): Receive = handleResponses(init, connection) orElse {
78-
case BackpressureBuffer.LowWatermarkReached
79-
unstashAll()
80-
context.become(connected(init, connection))
81-
case _: Operation[Cmd, Evt] stash()
84+
def handleOutgoingStream(init: Init[WithinActorContext, Cmd, Evt], connection: ActorRef, stream: Enumerator[Cmd]): Receive = {
85+
case class StreamFinished()
86+
case class StreamChunk(c: Cmd)
87+
88+
def iteratee: Iteratee[Cmd, Unit] = {
89+
def step(i: Input[Cmd]): Iteratee[Cmd, Unit] = i match {
90+
case Input.EOF
91+
Done(Unit, Input.EOF)
92+
case Input.Empty Cont[Cmd, Unit](i step(i))
93+
case Input.El(e)
94+
self ! StreamChunk(e)
95+
Cont[Cmd, Unit](i step(i))
96+
}
97+
(Cont[Cmd, Unit](i step(i)))
98+
}
99+
100+
(stream |>>> iteratee).map(x StreamFinished()).pipeTo(self)
101+
102+
handleResponses(init, connection) orElse {
103+
case StreamChunk(x)
104+
connection ! init.Command(x)
105+
case x: StreamFinished
106+
unstashAll()
107+
context.unbecome()
108+
case scala.util.Failure(e: Throwable)
109+
log.error(e.getMessage)
110+
context.stop(self)
111+
case BackpressureBuffer.HighWatermarkReached
112+
context.become(handleResponses(init, connection) orElse {
113+
case BackpressureBuffer.LowWatermarkReached
114+
unstashAll()
115+
context.unbecome()
116+
case _: SentinelCommand | _: StreamChunk stash()
117+
}, discardOld = false)
118+
case _: SentinelCommand stash()
119+
}
82120
}
83121
}
84122

85-
class WaitingSentinelClientWorker[Cmd, Evt](stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
86-
workerDescription: String = "Sentinel Client Worker")(lowBytes: Long, highBytes: Long, maxBufferSize: Long) extends SentinelClientWorker(stages, workerDescription)(lowBytes, highBytes, maxBufferSize) {
123+
class WaitingSentinelClientWorker[Cmd, Evt](address: InetSocketAddress, stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
124+
workerDescription: String = "Sentinel Client Worker")(lowBytes: Long, highBytes: Long, maxBufferSize: Long) extends SentinelClientWorker(address, stages, workerDescription)(lowBytes, highBytes, maxBufferSize) {
87125

88126
import context.dispatcher
89127

@@ -120,7 +158,7 @@ class WaitingSentinelClientWorker[Cmd, Evt](stages: ⇒ PipelineStage[PipelineCo
120158
}
121159

122160
case BackpressureBuffer.HighWatermarkReached
123-
context.become(highWaterMark(init, connection))
161+
super.connected(init, connection)(BackpressureBuffer.HighWatermarkReached)
124162
}
125163
}
126164

src/test/scala/nl/gideondk/sentinel/PingPongSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ trait PingPongWorkers {
5050
val pingServer = SentinelServer.async(8000, PingPongServerHandler.handle, "Ping Server")(stages)(serverSystem)
5151

5252
val clientSystem = ActorSystem("ping-client-system")
53-
val pingClient = SentinelClient("localhost", 8000, RandomRouter(32), "Ping Client")(stages)(clientSystem)
53+
val pingClient = SentinelClient.dynamic("localhost", 8000, 8, 32, "Ping Client")(stages)(clientSystem)
5454
}
5555

5656
class PingPongSpec extends Specification with PingPongWorkers {
@@ -71,7 +71,7 @@ class PingPongSpec extends Specification with PingPongWorkers {
7171
}
7272

7373
"server and client be able to handle multiple concurrent requests" in {
74-
val num = 200000
74+
val num = 20000
7575

7676
val mulActs = for (i 1 to num) yield (pingClient <~< PingPongMessageFormat("PING"))
7777
val tasks = Task.sequenceSuccesses(mulActs.toList)

0 commit comments

Comments
 (0)