Skip to content

Commit e678029

Browse files
committed
Pushing a bit more of a standard between the consumer and answerer...
1 parent 503cb62 commit e678029

File tree

3 files changed

+123
-94
lines changed

3 files changed

+123
-94
lines changed

src/main/scala/nl/gideondk/sentinel/Answerer.scala

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,51 @@ import scala.concurrent.Future
2424
import scalaz.contrib.std.scalaFuture._
2525
import nl.gideondk.sentinel.CatchableFuture._
2626

27-
class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor with ActorLogging with Stash {
28-
import context.dispatcher
27+
import Action._
28+
29+
object Answerer {
30+
def answererSink[O](acquire: Future[ActorRef])(release: ActorRef Future[Unit])(step: ActorRef Future[O])(implicit context: ExecutionContext): Process[Future, O] = {
31+
def go(step: Future[O], onExit: Process[Future, O]): Process[Future, O] =
32+
await[Future, O, O](step)(o emit(o) ++ go(step, onExit), onExit, onExit)
33+
34+
await(acquire)(r {
35+
val onExit = eval(release(r)).drain
36+
go(step(r), onExit)
37+
}, halt, halt)
38+
}
2939

3040
trait HandleResult
31-
case class HandleAsyncResult(response: Cmd) extends HandleResult
32-
case class HandleStreamResult(response: Process[Future, Cmd]) extends HandleResult
41+
case class HandleAsyncResult[Cmd](response: Cmd) extends HandleResult
42+
case class HandleStreamResult[Cmd](stream: Process[Future, Cmd]) extends HandleResult
3343

34-
trait StreamChunk
35-
case class StreamData[Cmd](c: Cmd) extends StreamChunk
36-
case object StreamEnd extends StreamChunk
37-
case object StreamChunkReceived
44+
trait StreamProducerMessage
45+
case class StreamProducerChunk[Cmd](c: Cmd) extends StreamProducerMessage
46+
47+
case object StartStreamHandling extends StreamProducerMessage
48+
case object ReadyForStream extends StreamProducerMessage
49+
case object StreamProducerEnded extends StreamProducerMessage
50+
case object StreamProducerChunkReceived extends StreamProducerMessage
3851

3952
case object DequeueResponse
53+
}
54+
55+
class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], streamChunkTimeout: Timeout = Timeout(5 seconds)) extends Actor with ActorLogging with Stash {
56+
import Answerer._
57+
import context.dispatcher
4058

4159
var responseQueue = Queue.empty[Promise[HandleResult]]
4260

4361
def handleRequest: Receive = {
4462
case x: Answer[Evt, Cmd]
45-
val serverWorker = self
63+
val me = self
4664
val promise = Promise[HandleResult]()
4765
responseQueue :+= promise
4866

4967
val fut = for {
5068
response x.future map (result HandleAsyncResult(result))
5169
} yield {
5270
promise.success(response)
53-
serverWorker ! DequeueResponse
71+
me ! DequeueResponse
5472
}
5573

5674
fut.onFailure {
@@ -59,15 +77,15 @@ class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor
5977
context.stop(self)
6078
}
6179
case x: ProduceStream[Evt, Cmd]
62-
val serverWorker = self
80+
val me = self
6381
val promise = Promise[HandleResult]()
6482
responseQueue :+= promise
6583

6684
val fut = for {
6785
response x.futureProcess map (result HandleStreamResult(result))
6886
} yield {
6987
promise.success(response)
70-
serverWorker ! DequeueResponse
88+
me ! DequeueResponse
7189
}
7290

7391
fut.onFailure {
@@ -100,21 +118,25 @@ class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor
100118
}
101119

102120
def handleRequestAndResponse: Receive = handleRequest orElse handleDequeue orElse {
103-
case x: HandleAsyncResult context.parent ! Command.Reply(x.response)
104-
case x: HandleStreamResult
105-
x.stream
121+
case x: HandleAsyncResult[Cmd] context.parent ! Command.Reply(x.response)
122+
case x: HandleStreamResult[Cmd]
123+
val worker = self
124+
implicit val timeout = streamChunkTimeout
125+
x.stream to answererSink((worker ? StartStreamHandling).map(x worker))((a: ActorRef) (a ? StreamProducerEnded).mapTo[Unit])((a: ActorRef) Future { (c: Cmd) (self ? StreamProducerChunk(c)).mapTo[Unit] })
106126
context.become(handleRequestAndStreamResponse)
107-
case x: StreamChunk
127+
case x: StreamProducerMessage
108128
log.error("Internal leakage in stream: received stream unexpected stream chunk")
109129
context.stop(self)
110130
}
111131

112132
def handleRequestAndStreamResponse: Receive = handleRequest orElse handleDequeue orElse {
113-
case StreamData(c)
114-
sender ! StreamChunkReceived
115-
context.parent ! Command.StreamReply(x.response)
116-
case StreamEnd
117-
sender ! StreamChunkReceived
133+
case StartStreamHandling
134+
sender ! ReadyForStream
135+
case StreamProducerChunk(c)
136+
sender ! StreamProducerChunkReceived
137+
context.parent ! Command.StreamReply(c)
138+
case StreamProducerEnded
139+
sender ! StreamProducerChunkReceived
118140
context.become(handleRequestAndResponse)
119141
case _ stash()
120142
}

src/main/scala/nl/gideondk/sentinel/Antenna.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], decider: Actio
2929
var commandQueue = Queue.empty[Cmd]
3030

3131
def active(tcpHandler: ActorRef): Receive = {
32-
val answerer = context.actorOf(Props(new RxProcessors.Answerer(init)))
33-
val consumer = context.actorOf(Props(new RxProcessors.Consumer(init)))
32+
val answerer = context.actorOf(Props(new Answerer(init)))
33+
val consumer = context.actorOf(Props(new Consumer(init)))
3434

3535
context watch answerer
3636
context watch consumer

src/main/scala/nl/gideondk/sentinel/Consumer.scala

Lines changed: 78 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,23 @@ import scala.concurrent.Future
2424
import scalaz.contrib.std.scalaFuture._
2525
import nl.gideondk.sentinel.CatchableFuture._
2626

27-
trait InternalConsumerMessage
27+
import Action._
2828

29-
class ConsumerMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
30-
PriorityGenerator {
31-
case x: InternalConsumerMessage 0
32-
case x: Management.ManagementMessage 1
33-
case _ 10
34-
})
29+
object Consumer {
30+
trait StreamConsumerMessage
3531

36-
import Action._
32+
case object ReadyForStream extends StreamConsumerMessage
33+
case object StartingWithStream extends StreamConsumerMessage
34+
case object AskNextChunk extends StreamConsumerMessage
35+
case object RegisterStreamConsumer extends StreamConsumerMessage
36+
case object ReleaseStreamConsumer extends StreamConsumerMessage
37+
38+
class ConsumerMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
39+
PriorityGenerator {
40+
case x: StreamConsumerMessage 0
41+
case x: Management.ManagementMessage 1
42+
case _ 10
43+
})
3744

3845
def consumerResource[O](acquire: Future[ActorRef])(release: ActorRef Future[Unit])(step: ActorRef Future[O])(terminator: O Boolean, includeTerminator: Boolean)(implicit context: ExecutionContext): Process[Future, O] = {
3946
def go(step: Future[O], onExit: Process[Future, O]): Process[Future, O] =
@@ -55,81 +62,81 @@ class ConsumerMailbox(settings: Settings, cfg: Config) extends UnboundedPriority
5562
go(step(r), onExit)
5663
}, halt, halt)
5764
}
65+
}
5866

59-
class Consumer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor with ActorLogging {
60-
import Registration._
61-
62-
import context.dispatcher
67+
class Consumer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], streamChunkTimeout: Timeout = Timeout(5 seconds)) extends Actor with ActorLogging {
68+
import Registration._
69+
import Consumer._
6370

64-
var hooks = Queue[Promise[Evt]]()
65-
var buffer = Queue[Promise[Evt]]()
71+
import context.dispatcher
6672

67-
var registrations = Queue[Registration[Evt]]()
68-
var currentPromise: Option[Promise[Evt]] = None
73+
var hooks = Queue[Promise[Evt]]()
74+
var buffer = Queue[Promise[Evt]]()
6975

70-
var runningSource: Option[Process[Future, Evt]] = None
76+
var registrations = Queue[Registration[Evt]]()
77+
var currentPromise: Option[Promise[Evt]] = None
7178

72-
case object AskNextChunk extends InternalConsumerMessage
73-
case object RegisterSource extends InternalConsumerMessage
74-
case object ReleaseSource extends InternalConsumerMessage
79+
var runningSource: Option[Process[Future, Evt]] = None
7580

76-
implicit val timeout = Timeout(5 seconds)
81+
def popAndSetHook = {
82+
val worker = self
83+
val registration = registrations.head
84+
registrations = registrations.tail
7785

78-
def popAndSetHook = {
79-
val me = self
80-
val registration = registrations.head
81-
registrations = registrations.tail
86+
implicit val timeout = streamChunkTimeout
8287

83-
registration match {
84-
case x: ReplyRegistration[Evt] x.promise.completeWith((self ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))
85-
case x: StreamReplyRegistration[Evt]
86-
val resource = consumerResource((me ? RegisterSource).map(x self))((x: ActorRef) (x ? ReleaseSource).mapTo[Unit])((x: ActorRef)
87-
(x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(x.terminator, x.includeTerminator)
88+
registration match {
89+
case x: ReplyRegistration[Evt] x.promise.completeWith((self ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))
90+
case x: StreamReplyRegistration[Evt]
91+
val resource = consumerResource((worker ? RegisterStreamConsumer).map(x worker))((x: ActorRef) (x ? ReleaseStreamConsumer).mapTo[Unit])((x: ActorRef)
92+
(x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(x.terminator, x.includeTerminator)
8893

89-
runningSource = Some(resource)
90-
x.promise success resource
91-
}
94+
runningSource = Some(resource)
95+
x.promise success resource
9296
}
97+
}
9398

94-
var behavior: Receive = {
95-
case RegisterSource
96-
sender ! self
97-
98-
case ReleaseSource
99-
runningSource = None
100-
if (hooks.headOption.isDefined) popAndSetHook
101-
sender ! ()
102-
103-
case AskNextChunk
104-
val promise = buffer.headOption match {
105-
case Some(p)
106-
buffer = buffer.tail
107-
p
108-
case None
109-
val p = Promise[Evt]()
110-
hooks :+= p
111-
p
112-
}
113-
sender ! promise
114-
115-
case rc: Registration[Evt]
116-
registrations :+= rc
117-
if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
118-
119-
case init.Event(data)
120-
hooks.headOption match {
121-
case Some(x)
122-
x.success(data)
123-
hooks = hooks.tail
124-
case None
125-
buffer :+= Promise.successful(data)
126-
}
99+
def handleRegistrations: Receive = {
100+
case rc: Registration[Evt]
101+
registrations :+= rc
102+
if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
103+
}
127104

128-
}
105+
var behavior: Receive = handleRegistrations orElse {
106+
case ReadyForStream
107+
sender ! StartingWithStream
108+
109+
case ReleaseStreamConsumer
110+
runningSource = None
111+
if (hooks.headOption.isDefined) popAndSetHook
112+
sender ! ()
113+
114+
case AskNextChunk
115+
val promise = buffer.headOption match {
116+
case Some(p)
117+
buffer = buffer.tail
118+
p
119+
case None
120+
val p = Promise[Evt]()
121+
hooks :+= p
122+
p
123+
}
124+
sender ! promise
125+
126+
case init.Event(data)
127+
hooks.headOption match {
128+
case Some(x)
129+
x.success(data)
130+
hooks = hooks.tail
131+
case None
132+
buffer :+= Promise.successful(data)
133+
}
129134

130-
override def postStop() = {
131-
hooks.foreach(_.failure(new Exception("Actor quit unexpectedly")))
132-
}
135+
}
136+
137+
override def postStop() = {
138+
hooks.foreach(_.failure(new Exception("Actor quit unexpectedly")))
139+
}
133140

134-
def receive = behavior
141+
def receive = behavior
135142
}

0 commit comments

Comments
 (0)