Skip to content

Commit 99f2d20

Browse files
committed
Cleanup
1 parent 53cb862 commit 99f2d20

15 files changed

+91
-786
lines changed

src/main/scala/nl/gideondk/sentinel/Ack.scala

-3
This file was deleted.

src/main/scala/nl/gideondk/sentinel/Antenna.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], decider: Actio
5757

5858
handleTermination orElse {
5959
case x: Command.Ask[Cmd, Evt]
60-
consumer ! Management.RegisterReply(x.terminator, x.includeTerminator, x.pp)
60+
consumer ! x.registration
61+
tcpHandler ! init.Command(x.payload)
62+
63+
case x: Command.AskStream[Cmd, Evt]
64+
consumer ! x.registration
6165
tcpHandler ! init.Command(x.payload)
6266

6367
case x: Command.Reply[Cmd]

src/main/scala/nl/gideondk/sentinel/Command.scala

-33
This file was deleted.

src/main/scala/nl/gideondk/sentinel/Management.scala

-27
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
12+
import scala.util.Try
13+
import scala.concurrent.duration._
14+
import akka.pattern.ask
15+
import akka.util.Timeout
16+
17+
import scala.concurrent.Future
18+
import scalaz.contrib.std.scalaFuture._
19+
20+
trait Registration[Evt]
21+
22+
object Registration {
23+
case class ReplyRegistration[Evt](promise: Promise[Evt]) extends Registration[Evt]
24+
case class StreamReplyRegistration[Evt](terminator: Evt Boolean, includeTerminator: Boolean, promise: Promise[Process[Future, Evt]]) extends Registration[Evt]
25+
}
26+
27+
trait Command[Cmd]
28+
29+
object Command {
30+
import Registration._
31+
case class Ask[Cmd, Evt](payload: Cmd, registration: ReplyRegistration[Evt]) extends Command[Cmd]
32+
case class AskStream[Cmd, Evt](payload: Cmd, registration: StreamReplyRegistration[Evt]) extends Command[Cmd]
33+
34+
case class Reply[Cmd](payload: Cmd) extends Command[Cmd]
35+
}
36+
37+
object Management {
38+
trait ManagementMessage
39+
case class RegisterTcpHandler(h: ActorRef) extends ManagementMessage
40+
41+
//case class StreamReplyRegistration[Evt](promise: Promise[Process[Future, Evt]]) extends Registration[Evt]
42+
43+
case object ReplyRegistered extends ManagementMessage
44+
}
45+

src/main/scala/nl/gideondk/sentinel/Operation.scala

-18
This file was deleted.

src/main/scala/nl/gideondk/sentinel/rx/Processors.scala renamed to src/main/scala/nl/gideondk/sentinel/Processors.scala

+35-18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package nl.gideondk.sentinel.rx
1+
package nl.gideondk.sentinel
22

33
import scala.collection.immutable.Queue
44
import scala.concurrent.{ Future, Promise }
@@ -58,29 +58,49 @@ object RxProcessors {
5858
}
5959

6060
class Consumer[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt]) extends Actor with ActorLogging {
61+
import Registration._
62+
6163
import context.dispatcher
6264

6365
var hooks = Queue[Promise[Evt]]()
64-
var queue = Queue[Promise[Evt]]()
66+
var buffer = Queue[Promise[Evt]]()
67+
68+
var registrations = Queue[Registration[Evt]]()
69+
70+
//var registrations = Queue[Management.RegisterReply[Evt]]()
71+
72+
var currentPromise: Option[Promise[Evt]] = None
6573

6674
var runningSource: Option[Process[Future, Evt]] = None
6775

6876
case object AskNextChunk extends InternalConsumerMessage
6977
case object RegisterSource extends InternalConsumerMessage
7078
case object ReleaseSource extends InternalConsumerMessage
7179

72-
def popAndSetSource = {
80+
implicit val timeout = Timeout(5 seconds)
81+
82+
def popAndSetHook = {
7383
val me = self
7484
val registration = registrations.head
7585
registrations = registrations.tail
7686

77-
implicit val timeout = Timeout(5 seconds)
87+
registration match {
88+
case x: ReplyRegistration[Evt] x.promise.completeWith((self ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))
89+
case x: StreamReplyRegistration[Evt]
90+
val resource = consumerResource((me ? RegisterSource).map(x self))((x: ActorRef) (x ? ReleaseSource).mapTo[Unit])((x: ActorRef)
91+
(x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(x.terminator, x.includeTerminator)
7892

79-
val resource = consumerResource((me ? RegisterSource).map(x self))((x: ActorRef) (x ? ReleaseSource).mapTo[Unit])((x: ActorRef)
80-
(x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(registration.terminator, registration.includeTerminator)
93+
runningSource = Some(resource)
94+
x.promise success resource
95+
}
96+
97+
// implicit val timeout = Timeout(5 seconds)
8198

82-
runningSource = Some(resource)
83-
registration.promise success resource
99+
// val resource = consumerResource((me ? RegisterSource).map(x ⇒ self))((x: ActorRef) ⇒ (x ? ReleaseSource).mapTo[Unit])((x: ActorRef) ⇒
100+
// (x ? AskNextChunk).mapTo[Promise[Evt]].flatMap(_.future))(registration.terminator, registration.includeTerminator)
101+
102+
// runningSource = Some(resource)
103+
// registration.promise success resource
84104
}
85105

86106
var behavior: Receive = {
@@ -89,13 +109,13 @@ object RxProcessors {
89109

90110
case ReleaseSource
91111
runningSource = None
92-
if (registrations.headOption.isDefined) popAndSetSource
112+
if (hooks.headOption.isDefined) popAndSetHook
93113
sender ! ()
94114

95115
case AskNextChunk
96-
val promise = queue.headOption match {
116+
val promise = buffer.headOption match {
97117
case Some(p)
98-
queue = queue.tail
118+
buffer = buffer.tail
99119
p
100120
case None
101121
val p = Promise[Evt]()
@@ -104,26 +124,23 @@ object RxProcessors {
104124
}
105125
sender ! promise
106126

107-
case rc: Management.RegisterReply[Evt]
127+
case rc: Registration[Evt]
108128
registrations :+= rc
109-
if (runningSource.isEmpty && registrations.headOption.isDefined) popAndSetSource
129+
if (runningSource.isEmpty && currentPromise.isEmpty) popAndSetHook
110130

111131
case init.Event(data)
112132
hooks.headOption match {
113133
case Some(x)
114134
x.success(data)
115135
hooks = hooks.tail
116136
case None
117-
queue :+= Promise.successful(data)
137+
buffer :+= Promise.successful(data)
118138
}
119139

120140
}
121141

122-
var registrations = Queue[Management.RegisterReply[Evt]]()
123-
var eventBuffer = Queue[Evt]()
124-
125142
override def postStop() = {
126-
(hooks ++ queue).foreach(_.failure(new Exception("Actor quit unexpectedly")))
143+
hooks.foreach(_.failure(new Exception("Actor quit unexpectedly")))
127144
}
128145

129146
def receive = behavior

src/main/scala/nl/gideondk/sentinel/client/Core.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ import scalaz._
2929
import Scalaz._
3030

3131
trait Client[Cmd, Evt] {
32+
import Registration._
33+
3234
def actor: ActorRef
3335

3436
def <~<(command: Cmd)(implicit context: ExecutionContext): Task[Evt] = sendCommand(command)
3537

3638
def sendCommand(command: Cmd)(implicit context: ExecutionContext): Task[Evt] = Task {
3739
val promise = Promise[Evt]()
38-
actor ! Command.Ask(command, promise) // Terminate directly and always return terminator => single result
40+
actor ! Command.Ask(command, ReplyRegistration(promise)) // Terminate directly and always return terminator => single result
3941
promise.future
4042
}
4143

@@ -161,7 +163,7 @@ class ClientCore[Cmd, Evt](routerConfig: RouterConfig, description: String, reco
161163
coreRouter match {
162164
case Some(r)
163165
r forward x
164-
case None x.pp.failure(new Exception("No connection(s) available"))
166+
case None x.registration.promise.failure(new Exception("No connection(s) available"))
165167
}
166168

167169
case _

src/main/scala/nl/gideondk/sentinel/client/Supervisor.scala

-113
This file was deleted.

0 commit comments

Comments
 (0)