1
+ package nl .gideondk .sentinel
2
+
3
+ import scala .collection .immutable .Queue
4
+ import scala .concurrent .{ Future , Promise }
5
+ import scala .util .{ Failure , Success }
6
+ import akka .actor ._
7
+ import akka .io .BackpressureBuffer
8
+ import akka .io .TcpPipelineHandler .{ Init , WithinActorContext }
9
+ import scalaz .stream ._
10
+ import scalaz .stream .Process ._
11
+ import scala .util .Try
12
+ import scala .concurrent .duration ._
13
+ import akka .pattern .ask
14
+ import akka .util .Timeout
15
+ import nl .gideondk .sentinel ._
16
+ import scala .concurrent .ExecutionContext
17
+ import akka .dispatch ._
18
+ import scalaz ._
19
+ import Scalaz ._
20
+ import com .typesafe .config .Config
21
+ import akka .actor .ActorSystem .Settings
22
+
23
+ import scala .concurrent .Future
24
+ import scalaz .contrib .std .scalaFuture ._
25
+ import nl .gideondk .sentinel .CatchableFuture ._
26
+
27
+ class Answerer [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ]) extends Actor with ActorLogging with Stash {
28
+ import context .dispatcher
29
+
30
+ trait HandleResult
31
+ case class HandleAsyncResult (response : Cmd ) extends HandleResult
32
+ case class HandleStreamResult (response : Process [Future , Cmd ]) extends HandleResult
33
+
34
+ trait StreamChunk
35
+ case class StreamData [Cmd ](c : Cmd ) extends StreamChunk
36
+ case object StreamEnd extends StreamChunk
37
+ case object StreamChunkReceived
38
+
39
+ case object DequeueResponse
40
+
41
+ var responseQueue = Queue .empty[Promise [HandleResult ]]
42
+
43
+ def handleRequest : Receive = {
44
+ case x : Answer [Evt , Cmd ] ⇒
45
+ val serverWorker = self
46
+ val promise = Promise [HandleResult ]()
47
+ responseQueue :+= promise
48
+
49
+ val fut = for {
50
+ response ← x.future map (result ⇒ HandleAsyncResult (result))
51
+ } yield {
52
+ promise.success(response)
53
+ serverWorker ! DequeueResponse
54
+ }
55
+
56
+ fut.onFailure {
57
+ case e ⇒
58
+ log.error(e, e.getMessage)
59
+ context.stop(self)
60
+ }
61
+ case x : ProduceStream [Evt , Cmd ] ⇒
62
+ val serverWorker = self
63
+ val promise = Promise [HandleResult ]()
64
+ responseQueue :+= promise
65
+
66
+ val fut = for {
67
+ response ← x.futureProcess map (result ⇒ HandleStreamResult (result))
68
+ } yield {
69
+ promise.success(response)
70
+ serverWorker ! DequeueResponse
71
+ }
72
+
73
+ fut.onFailure {
74
+ case e ⇒
75
+ log.error(e, e.getMessage)
76
+ context.stop(self)
77
+ }
78
+ }
79
+
80
+ def handleDequeue : Receive = {
81
+ case DequeueResponse ⇒ {
82
+ def dequeueAndSend : Unit = {
83
+ if (! responseQueue.isEmpty && responseQueue.front.isCompleted) {
84
+ // TODO: Should be handled a lot safer!
85
+ val promise = responseQueue.head
86
+ responseQueue = responseQueue.tail
87
+ promise.future.value match {
88
+ case Some (Success (v)) ⇒
89
+ self ! v
90
+ dequeueAndSend
91
+ case Some (Failure (e)) ⇒ // Would normally not occur...
92
+ log.error(e, e.getMessage)
93
+ context.stop(self)
94
+ }
95
+ }
96
+
97
+ }
98
+ dequeueAndSend
99
+ }
100
+ }
101
+
102
+ def handleRequestAndResponse : Receive = handleRequest orElse handleDequeue orElse {
103
+ case x : HandleAsyncResult ⇒ context.parent ! Command .Reply (x.response)
104
+ case x : HandleStreamResult ⇒
105
+ x.stream
106
+ context.become(handleRequestAndStreamResponse)
107
+ case x : StreamChunk ⇒
108
+ log.error(" Internal leakage in stream: received stream unexpected stream chunk" )
109
+ context.stop(self)
110
+ }
111
+
112
+ def handleRequestAndStreamResponse : Receive = handleRequest orElse handleDequeue orElse {
113
+ case StreamData (c) ⇒
114
+ sender ! StreamChunkReceived
115
+ context.parent ! Command .StreamReply (x.response)
116
+ case StreamEnd ⇒
117
+ sender ! StreamChunkReceived
118
+ context.become(handleRequestAndResponse)
119
+ case _ ⇒ stash()
120
+ }
121
+
122
+ def receive = handleRequestAndResponse
123
+ }
0 commit comments