1
- package nl .gideondk .sentinel . rx
1
+ package nl .gideondk .sentinel
2
2
3
3
import scala .collection .immutable .Queue
4
4
import scala .concurrent .{ Future , Promise }
@@ -58,29 +58,49 @@ object RxProcessors {
58
58
}
59
59
60
60
class Consumer [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ]) extends Actor with ActorLogging {
61
+ import Registration ._
62
+
61
63
import context .dispatcher
62
64
63
65
var hooks = Queue [Promise [Evt ]]()
64
- var queue = Queue [Promise [Evt ]]()
66
+ var buffer = Queue [Promise [Evt ]]()
67
+
68
+ var registrations = Queue [Registration [Evt ]]()
69
+
70
+ // var registrations = Queue[Management.RegisterReply[Evt]]()
71
+
72
+ var currentPromise : Option [Promise [Evt ]] = None
65
73
66
74
var runningSource : Option [Process [Future , Evt ]] = None
67
75
68
76
case object AskNextChunk extends InternalConsumerMessage
69
77
case object RegisterSource extends InternalConsumerMessage
70
78
case object ReleaseSource extends InternalConsumerMessage
71
79
72
- def popAndSetSource = {
80
+ implicit val timeout = Timeout (5 seconds)
81
+
82
+ def popAndSetHook = {
73
83
val me = self
74
84
val registration = registrations.head
75
85
registrations = registrations.tail
76
86
77
- implicit val timeout = Timeout (5 seconds)
87
+ registration match {
88
+ case x : ReplyRegistration [Evt ] ⇒ x.promise.completeWith((self ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))
89
+ case x : StreamReplyRegistration [Evt ] ⇒
90
+ val resource = consumerResource((me ? RegisterSource ).map(x ⇒ self))((x : ActorRef ) ⇒ (x ? ReleaseSource ).mapTo[Unit ])((x : ActorRef ) ⇒
91
+ (x ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))(x.terminator, x.includeTerminator)
78
92
79
- val resource = consumerResource((me ? RegisterSource ).map(x ⇒ self))((x : ActorRef ) ⇒ (x ? ReleaseSource ).mapTo[Unit ])((x : ActorRef ) ⇒
80
- (x ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))(registration.terminator, registration.includeTerminator)
93
+ runningSource = Some (resource)
94
+ x.promise success resource
95
+ }
96
+
97
+ // implicit val timeout = Timeout(5 seconds)
81
98
82
- runningSource = Some (resource)
83
- registration.promise success resource
99
+ // val resource = consumerResource((me ? RegisterSource).map(x ⇒ self))((x: ActorRef) ⇒ (x ? ReleaseSource).mapTo[Unit])((x: ActorRef) ⇒
100
+ // (x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(registration.terminator, registration.includeTerminator)
101
+
102
+ // runningSource = Some(resource)
103
+ // registration.promise success resource
84
104
}
85
105
86
106
var behavior : Receive = {
@@ -89,13 +109,13 @@ object RxProcessors {
89
109
90
110
case ReleaseSource ⇒
91
111
runningSource = None
92
- if (registrations .headOption.isDefined) popAndSetSource
112
+ if (hooks .headOption.isDefined) popAndSetHook
93
113
sender ! ()
94
114
95
115
case AskNextChunk ⇒
96
- val promise = queue .headOption match {
116
+ val promise = buffer .headOption match {
97
117
case Some (p) ⇒
98
- queue = queue .tail
118
+ buffer = buffer .tail
99
119
p
100
120
case None ⇒
101
121
val p = Promise [Evt ]()
@@ -104,26 +124,23 @@ object RxProcessors {
104
124
}
105
125
sender ! promise
106
126
107
- case rc : Management . RegisterReply [Evt ] ⇒
127
+ case rc : Registration [Evt ] ⇒
108
128
registrations :+= rc
109
- if (runningSource.isEmpty && registrations.headOption.isDefined) popAndSetSource
129
+ if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
110
130
111
131
case init.Event (data) ⇒
112
132
hooks.headOption match {
113
133
case Some (x) ⇒
114
134
x.success(data)
115
135
hooks = hooks.tail
116
136
case None ⇒
117
- queue :+= Promise .successful(data)
137
+ buffer :+= Promise .successful(data)
118
138
}
119
139
120
140
}
121
141
122
- var registrations = Queue [Management .RegisterReply [Evt ]]()
123
- var eventBuffer = Queue [Evt ]()
124
-
125
142
override def postStop () = {
126
- ( hooks ++ queue) .foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
143
+ hooks.foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
127
144
}
128
145
129
146
def receive = behavior
0 commit comments