@@ -24,16 +24,23 @@ import scala.concurrent.Future
24
24
import scalaz .contrib .std .scalaFuture ._
25
25
import nl .gideondk .sentinel .CatchableFuture ._
26
26
27
- trait InternalConsumerMessage
27
+ import Action . _
28
28
29
- class ConsumerMailbox (settings : Settings , cfg : Config ) extends UnboundedPriorityMailbox (
30
- PriorityGenerator {
31
- case x : InternalConsumerMessage ⇒ 0
32
- case x : Management .ManagementMessage ⇒ 1
33
- case _ ⇒ 10
34
- })
29
+ object Consumer {
30
+ trait StreamConsumerMessage
35
31
36
- import Action ._
32
+ case object ReadyForStream extends StreamConsumerMessage
33
+ case object StartingWithStream extends StreamConsumerMessage
34
+ case object AskNextChunk extends StreamConsumerMessage
35
+ case object RegisterStreamConsumer extends StreamConsumerMessage
36
+ case object ReleaseStreamConsumer extends StreamConsumerMessage
37
+
38
+ class ConsumerMailbox (settings : Settings , cfg : Config ) extends UnboundedPriorityMailbox (
39
+ PriorityGenerator {
40
+ case x : StreamConsumerMessage ⇒ 0
41
+ case x : Management .ManagementMessage ⇒ 1
42
+ case _ ⇒ 10
43
+ })
37
44
38
45
def consumerResource [O ](acquire : Future [ActorRef ])(release : ActorRef ⇒ Future [Unit ])(step : ActorRef ⇒ Future [O ])(terminator : O ⇒ Boolean , includeTerminator : Boolean )(implicit context : ExecutionContext ): Process [Future , O ] = {
39
46
def go (step : Future [O ], onExit : Process [Future , O ]): Process [Future , O ] =
@@ -55,81 +62,81 @@ class ConsumerMailbox(settings: Settings, cfg: Config) extends UnboundedPriority
55
62
go(step(r), onExit)
56
63
}, halt, halt)
57
64
}
65
+ }
58
66
59
- class Consumer [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ]) extends Actor with ActorLogging {
60
- import Registration ._
61
-
62
- import context .dispatcher
67
+ class Consumer [Cmd , Evt ](init : Init [WithinActorContext , Cmd , Evt ], streamChunkTimeout : Timeout = Timeout (5 seconds)) extends Actor with ActorLogging {
68
+ import Registration ._
69
+ import Consumer ._
63
70
64
- var hooks = Queue [Promise [Evt ]]()
65
- var buffer = Queue [Promise [Evt ]]()
71
+ import context .dispatcher
66
72
67
- var registrations = Queue [Registration [Evt ]]()
68
- var currentPromise : Option [Promise [Evt ]] = None
73
+ var hooks = Queue [Promise [Evt ]]()
74
+ var buffer = Queue [Promise [Evt ]]()
69
75
70
- var runningSource : Option [Process [Future , Evt ]] = None
76
+ var registrations = Queue [Registration [Evt ]]()
77
+ var currentPromise : Option [Promise [Evt ]] = None
71
78
72
- case object AskNextChunk extends InternalConsumerMessage
73
- case object RegisterSource extends InternalConsumerMessage
74
- case object ReleaseSource extends InternalConsumerMessage
79
+ var runningSource : Option [Process [Future , Evt ]] = None
75
80
76
- implicit val timeout = Timeout (5 seconds)
81
+ def popAndSetHook = {
82
+ val worker = self
83
+ val registration = registrations.head
84
+ registrations = registrations.tail
77
85
78
- def popAndSetHook = {
79
- val me = self
80
- val registration = registrations.head
81
- registrations = registrations.tail
86
+ implicit val timeout = streamChunkTimeout
82
87
83
- registration match {
84
- case x : ReplyRegistration [Evt ] ⇒ x.promise.completeWith((self ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))
85
- case x : StreamReplyRegistration [Evt ] ⇒
86
- val resource = consumerResource((me ? RegisterSource ).map(x ⇒ self ))((x : ActorRef ) ⇒ (x ? ReleaseSource ).mapTo[Unit ])((x : ActorRef ) ⇒
87
- (x ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))(x.terminator, x.includeTerminator)
88
+ registration match {
89
+ case x : ReplyRegistration [Evt ] ⇒ x.promise.completeWith((self ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))
90
+ case x : StreamReplyRegistration [Evt ] ⇒
91
+ val resource = consumerResource((worker ? RegisterStreamConsumer ).map(x ⇒ worker ))((x : ActorRef ) ⇒ (x ? ReleaseStreamConsumer ).mapTo[Unit ])((x : ActorRef ) ⇒
92
+ (x ? AskNextChunk ).mapTo[Promise [Evt ]].flatMap(_.future))(x.terminator, x.includeTerminator)
88
93
89
- runningSource = Some (resource)
90
- x.promise success resource
91
- }
94
+ runningSource = Some (resource)
95
+ x.promise success resource
92
96
}
97
+ }
93
98
94
- var behavior : Receive = {
95
- case RegisterSource ⇒
96
- sender ! self
97
-
98
- case ReleaseSource ⇒
99
- runningSource = None
100
- if (hooks.headOption.isDefined) popAndSetHook
101
- sender ! ()
102
-
103
- case AskNextChunk ⇒
104
- val promise = buffer.headOption match {
105
- case Some (p) ⇒
106
- buffer = buffer.tail
107
- p
108
- case None ⇒
109
- val p = Promise [Evt ]()
110
- hooks :+= p
111
- p
112
- }
113
- sender ! promise
114
-
115
- case rc : Registration [Evt ] ⇒
116
- registrations :+= rc
117
- if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
118
-
119
- case init.Event (data) ⇒
120
- hooks.headOption match {
121
- case Some (x) ⇒
122
- x.success(data)
123
- hooks = hooks.tail
124
- case None ⇒
125
- buffer :+= Promise .successful(data)
126
- }
99
+ def handleRegistrations : Receive = {
100
+ case rc : Registration [Evt ] ⇒
101
+ registrations :+= rc
102
+ if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
103
+ }
127
104
128
- }
105
+ var behavior : Receive = handleRegistrations orElse {
106
+ case ReadyForStream ⇒
107
+ sender ! StartingWithStream
108
+
109
+ case ReleaseStreamConsumer ⇒
110
+ runningSource = None
111
+ if (hooks.headOption.isDefined) popAndSetHook
112
+ sender ! ()
113
+
114
+ case AskNextChunk ⇒
115
+ val promise = buffer.headOption match {
116
+ case Some (p) ⇒
117
+ buffer = buffer.tail
118
+ p
119
+ case None ⇒
120
+ val p = Promise [Evt ]()
121
+ hooks :+= p
122
+ p
123
+ }
124
+ sender ! promise
125
+
126
+ case init.Event (data) ⇒
127
+ hooks.headOption match {
128
+ case Some (x) ⇒
129
+ x.success(data)
130
+ hooks = hooks.tail
131
+ case None ⇒
132
+ buffer :+= Promise .successful(data)
133
+ }
129
134
130
- override def postStop () = {
131
- hooks.foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
132
- }
135
+ }
136
+
137
+ override def postStop () = {
138
+ hooks.foreach(_.failure(new Exception (" Actor quit unexpectedly" )))
139
+ }
133
140
134
- def receive = behavior
141
+ def receive = behavior
135
142
}
0 commit comments