Skip to content

Commit 503cb62

Browse files
committed
Split processors into two separate files, further refactor...
1 parent 99f2d20 commit 503cb62

File tree

5 files changed

+138
-90
lines changed

5 files changed

+138
-90
lines changed

src/main/scala/nl/gideondk/sentinel/Action.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,22 @@ object Action {
2828
trait Reaction[Evt, Cmd] extends Action
2929

3030
trait StreamReaction[Evt, Cmd] extends Reaction[Evt, Cmd] {
31-
def core: Process[Future, Cmd]
31+
def futureProcess: Future[Process[Future, Cmd]]
3232
}
3333

34-
case class Answer[Evt, Cmd](f: Future[Cmd]) extends Reaction[Evt, Cmd]
35-
case class ConsumeStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
36-
case class ProduceStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
37-
case class ReactToStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
34+
case class Answer[Evt, Cmd](future: Future[Cmd]) extends Reaction[Evt, Cmd]
35+
case class ConsumeStream[Evt, Cmd](val futureProcess: Future[Process[Future, Cmd]]) extends StreamReaction[Evt, Cmd]
36+
case class ProduceStream[Evt, Cmd](val futureProcess: Future[Process[Future, Cmd]]) extends StreamReaction[Evt, Cmd]
37+
case class ReactToStream[Evt, Cmd](val futureProcess: Future[Process[Future, Cmd]]) extends StreamReaction[Evt, Cmd]
3838

3939
trait Decider[Evt, Cmd] {
4040
def answer(f: Future[Cmd]): Answer[Evt, Cmd] = Answer(f)
4141

42-
def produce(p: Process[Future, Cmd]): ProduceStream[Evt, Cmd] = ProduceStream(p)
42+
def produce(p: Future[Process[Future, Cmd]]): ProduceStream[Evt, Cmd] = ProduceStream(p)
4343

44-
def react(p: Process[Future, Cmd]): ReactToStream[Evt, Cmd] = ReactToStream(p)
44+
def react(p: Future[Process[Future, Cmd]]): ReactToStream[Evt, Cmd] = ReactToStream(p)
4545

46-
def consumeStream(p: Process[Future, Cmd]): ConsumeStream[Evt, Cmd] = ConsumeStream(p)
46+
def consumeStream(p: Future[Process[Future, Cmd]]): ConsumeStream[Evt, Cmd] = ConsumeStream(p)
4747

4848
def consume = Consume
4949

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
import scala.util.Try
12+
import scala.concurrent.duration._
13+
import akka.pattern.ask
14+
import akka.util.Timeout
15+
import nl.gideondk.sentinel._
16+
import scala.concurrent.ExecutionContext
17+
import akka.dispatch._
18+
import scalaz._
19+
import Scalaz._
20+
import com.typesafe.config.Config
21+
import akka.actor.ActorSystem.Settings
22+
23+
import scala.concurrent.Future
24+
import scalaz.contrib.std.scalaFuture._
25+
import nl.gideondk.sentinel.CatchableFuture._
26+
27+
class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor with ActorLogging with Stash {
28+
import context.dispatcher
29+
30+
trait HandleResult
31+
case class HandleAsyncResult(response: Cmd) extends HandleResult
32+
case class HandleStreamResult(response: Process[Future, Cmd]) extends HandleResult
33+
34+
trait StreamChunk
35+
case class StreamData[Cmd](c: Cmd) extends StreamChunk
36+
case object StreamEnd extends StreamChunk
37+
case object StreamChunkReceived
38+
39+
case object DequeueResponse
40+
41+
var responseQueue = Queue.empty[Promise[HandleResult]]
42+
43+
def handleRequest: Receive = {
44+
case x: Answer[Evt, Cmd]
45+
val serverWorker = self
46+
val promise = Promise[HandleResult]()
47+
responseQueue :+= promise
48+
49+
val fut = for {
50+
response x.future map (result HandleAsyncResult(result))
51+
} yield {
52+
promise.success(response)
53+
serverWorker ! DequeueResponse
54+
}
55+
56+
fut.onFailure {
57+
case e
58+
log.error(e, e.getMessage)
59+
context.stop(self)
60+
}
61+
case x: ProduceStream[Evt, Cmd]
62+
val serverWorker = self
63+
val promise = Promise[HandleResult]()
64+
responseQueue :+= promise
65+
66+
val fut = for {
67+
response x.futureProcess map (result HandleStreamResult(result))
68+
} yield {
69+
promise.success(response)
70+
serverWorker ! DequeueResponse
71+
}
72+
73+
fut.onFailure {
74+
case e
75+
log.error(e, e.getMessage)
76+
context.stop(self)
77+
}
78+
}
79+
80+
def handleDequeue: Receive = {
81+
case DequeueResponse {
82+
def dequeueAndSend: Unit = {
83+
if (!responseQueue.isEmpty && responseQueue.front.isCompleted) {
84+
// TODO: Should be handled a lot safer!
85+
val promise = responseQueue.head
86+
responseQueue = responseQueue.tail
87+
promise.future.value match {
88+
case Some(Success(v))
89+
self ! v
90+
dequeueAndSend
91+
case Some(Failure(e)) // Would normally not occur...
92+
log.error(e, e.getMessage)
93+
context.stop(self)
94+
}
95+
}
96+
97+
}
98+
dequeueAndSend
99+
}
100+
}
101+
102+
def handleRequestAndResponse: Receive = handleRequest orElse handleDequeue orElse {
103+
case x: HandleAsyncResult context.parent ! Command.Reply(x.response)
104+
case x: HandleStreamResult
105+
x.stream
106+
context.become(handleRequestAndStreamResponse)
107+
case x: StreamChunk
108+
log.error("Internal leakage in stream: received stream unexpected stream chunk")
109+
context.stop(self)
110+
}
111+
112+
def handleRequestAndStreamResponse: Receive = handleRequest orElse handleDequeue orElse {
113+
case StreamData(c)
114+
sender ! StreamChunkReceived
115+
context.parent ! Command.StreamReply(x.response)
116+
case StreamEnd
117+
sender ! StreamChunkReceived
118+
context.become(handleRequestAndResponse)
119+
case _ stash()
120+
}
121+
122+
def receive = handleRequestAndResponse
123+
}

src/main/scala/nl/gideondk/sentinel/Antenna.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,14 @@ class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], decider: Actio
6767
case x: Command.Reply[Cmd]
6868
tcpHandler ! init.Command(x.payload)
6969

70+
case x: Command.StreamReply[Cmd]
71+
tcpHandler ! init.Command(x.payload)
72+
7073
case init.Event(data) {
7174
decider.process(data) match {
72-
case x: Action.Answer[Evt, Cmd] answerer ! x
73-
case Action.Consume consumer ! init.Event(data) // Pass through
75+
case x: Action.Reaction[Evt, Cmd] answerer ! x
76+
case Action.Consume consumer ! init.Event(data) // Pass through
77+
case Action.Ignore ()
7478
}
7579
}
7680

src/main/scala/nl/gideondk/sentinel/Processors.scala renamed to src/main/scala/nl/gideondk/sentinel/Consumer.scala

-76
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class ConsumerMailbox(settings: Settings, cfg: Config) extends UnboundedPriority
3333
case _ 10
3434
})
3535

36-
object RxProcessors {
3736
import Action._
3837

3938
def consumerResource[O](acquire: Future[ActorRef])(release: ActorRef Future[Unit])(step: ActorRef Future[O])(terminator: O Boolean, includeTerminator: Boolean)(implicit context: ExecutionContext): Process[Future, O] = {
@@ -66,9 +65,6 @@ object RxProcessors {
6665
var buffer = Queue[Promise[Evt]]()
6766

6867
var registrations = Queue[Registration[Evt]]()
69-
70-
//var registrations = Queue[Management.RegisterReply[Evt]]()
71-
7268
var currentPromise: Option[Promise[Evt]] = None
7369

7470
var runningSource: Option[Process[Future, Evt]] = None
@@ -93,14 +89,6 @@ object RxProcessors {
9389
runningSource = Some(resource)
9490
x.promise success resource
9591
}
96-
97-
// implicit val timeout = Timeout(5 seconds)
98-
99-
// val resource = consumerResource((me ? RegisterSource).map(x ⇒ self))((x: ActorRef) ⇒ (x ? ReleaseSource).mapTo[Unit])((x: ActorRef) ⇒
100-
// (x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(registration.terminator, registration.includeTerminator)
101-
102-
// runningSource = Some(resource)
103-
// registration.promise success resource
10492
}
10593

10694
var behavior: Receive = {
@@ -144,68 +132,4 @@ object RxProcessors {
144132
}
145133

146134
def receive = behavior
147-
}
148-
149-
class Answerer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor with ActorLogging {
150-
import context.dispatcher
151-
152-
case class HandleAsyncResult(response: Cmd)
153-
case object DequeueResponse
154-
155-
var responseQueue = Queue.empty[Promise[HandleAsyncResult]]
156-
var behavior: Receive = handleRequestAndResponse
157-
158-
def handleRequest: Receive = {
159-
case x: Answer[Evt, Cmd]
160-
val serverWorker = self
161-
val promise = Promise[HandleAsyncResult]()
162-
responseQueue :+= promise
163-
164-
val fut = for {
165-
response x.f map (result HandleAsyncResult(result))
166-
} yield {
167-
promise.success(response)
168-
serverWorker ! DequeueResponse
169-
}
170-
171-
fut.onFailure {
172-
// If the future failed, message sequence isn't certain; tear down line to let client recover.
173-
case e
174-
log.error(e, e.getMessage)
175-
context.stop(self)
176-
}
177-
}
178-
179-
def handleRequestAndResponse: Receive = handleRequest orElse {
180-
case x: HandleAsyncResult context.parent ! Command.Reply(x.response)
181-
// x.response match {
182-
// case Some(r) ⇒ context.parent ! Command.Reply(r)
183-
// case None ⇒
184-
// log.error("Stream didn't result in a value")
185-
// context.stop(self)
186-
// }
187-
188-
case DequeueResponse {
189-
def dequeueAndSend: Unit = {
190-
if (!responseQueue.isEmpty && responseQueue.front.isCompleted) {
191-
// TODO: Should be handled a lot safer!
192-
val promise = responseQueue.head
193-
responseQueue = responseQueue.tail
194-
promise.future.value match {
195-
case Some(Success(v))
196-
self ! v
197-
dequeueAndSend
198-
case Some(Failure(e))
199-
log.error(e, e.getMessage)
200-
context.stop(self)
201-
}
202-
}
203-
204-
}
205-
dequeueAndSend
206-
}
207-
}
208-
209-
def receive = behavior
210-
}
211135
}

src/main/scala/nl/gideondk/sentinel/Messages.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,11 @@ object Command {
3232
case class AskStream[Cmd, Evt](payload: Cmd, registration: StreamReplyRegistration[Evt]) extends Command[Cmd]
3333

3434
case class Reply[Cmd](payload: Cmd) extends Command[Cmd]
35+
case class StreamReply[Cmd](payload: Cmd) extends Command[Cmd]
3536
}
3637

3738
object Management {
3839
trait ManagementMessage
3940
case class RegisterTcpHandler(h: ActorRef) extends ManagementMessage
40-
41-
//case class StreamReplyRegistration[Evt](promise: Promise[Process[Future, Evt]]) extends Registration[Evt]
42-
43-
case object ReplyRegistered extends ManagementMessage
4441
}
4542

0 commit comments

Comments
 (0)