Skip to content

Commit 99ec0a9

Browse files
committed
Split Commands and Replies
1 parent 9d00e8a commit 99ec0a9

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

src/main/scala/nl/gideondk/sentinel/Antenna.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,15 @@ class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], Resolver: Sent
5858
consumer ! x.registration
5959
tcpHandler ! init.Command(x.payload)
6060

61-
case x: Command.Reply[Cmd]
61+
case x: Reply.Response[Cmd]
6262
tcpHandler ! init.Command(x.payload)
6363

64-
case x: Command.StreamReply[Cmd]
64+
case x: Reply.StreamResponseChunk[Cmd]
6565
tcpHandler ! init.Command(x.payload)
6666

6767
case init.Event(data) {
6868
Resolver.process(data) match {
6969
case x: ResponderAction.Reaction[Evt, Cmd] responder ! x
70-
7170
case ConsumerAction.Consume consumer ! init.Event(data) // Pass through
7271
case ConsumerAction.Ignore ()
7372
}

src/main/scala/nl/gideondk/sentinel/Messages.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ object Registration {
1313
}
1414

1515
trait Command[Cmd]
16+
trait Reply[Cmd]
1617

1718
object Command {
1819
import Registration._
@@ -23,9 +24,11 @@ object Command {
2324
case class SendStream[Cmd, Evt](command: Cmd, stream: Process[Future, Cmd], registration: ReplyRegistration[Evt]) extends Command[Cmd]
2425

2526
case class Conversate[Cmd, Evt](command: Cmd, stream: Process[Future, Cmd], registration: StreamReplyRegistration[Evt]) extends Command[Cmd]
27+
}
2628

27-
case class Reply[Cmd](payload: Cmd) extends Command[Cmd]
28-
case class StreamReply[Cmd](payload: Cmd) extends Command[Cmd]
29+
object Reply {
30+
case class Response[Cmd](payload: Cmd) extends Reply[Cmd]
31+
case class StreamResponseChunk[Cmd](payload: Cmd) extends Reply[Cmd]
2932
}
3033

3134
object Management {

src/main/scala/nl/gideondk/sentinel/Resolver.scala

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ trait ResponseResolver[Evt, Cmd] extends Resolver[Evt, Cmd] {
2121
import ResponderAction._
2222
def answer(f: Future[Cmd]): Answer[Evt, Cmd] = Answer(f)
2323

24+
def handle(f: Unit): Handle[Evt, Cmd] = Handle()
25+
2426
def produce(p: Future[Process[Future, Cmd]]): ProduceStream[Evt, Cmd] = ProduceStream(p)
2527

2628
def react(p: Future[Process[Future, Cmd]]): ReactToStream[Evt, Cmd] = ReactToStream(p)

src/main/scala/nl/gideondk/sentinel/Responder.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class Responder[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], streamChunkT
6666
log.error(e, e.getMessage)
6767
context.stop(self)
6868
}
69+
6970
case x: ProduceStream[Evt, Cmd]
7071
val me = self
7172
val promise = Promise[HandleResult]()
@@ -108,7 +109,7 @@ class Responder[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], streamChunkT
108109
}
109110

110111
def handleRequestAndResponse: Receive = handleRequest orElse handleDequeue orElse {
111-
case x: HandleAsyncResult[Cmd] context.parent ! Command.Reply(x.response)
112+
case x: HandleAsyncResult[Cmd] context.parent ! Reply.Response(x.response)
112113
case x: HandleStreamResult[Cmd]
113114
val worker = self
114115
implicit val timeout = streamChunkTimeout
@@ -124,7 +125,7 @@ class Responder[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], streamChunkT
124125
sender ! ReadyForStream
125126
case StreamProducerChunk(c)
126127
sender ! StreamProducerChunkReceived
127-
context.parent ! Command.StreamReply(c)
128+
context.parent ! Reply.StreamResponseChunk(c)
128129
case StreamProducerEnded
129130
sender ! StreamProducerChunkReceived
130131
context.become(handleRequestAndResponse)

src/main/scala/nl/gideondk/sentinel/client/Core.scala

+11-5
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ trait Client[Cmd, Evt] {
3737
promise.future
3838
}
3939

40-
// def sendStream(command: Cmd, source: Process[Future, Evt]): Task[Evt] = Task {
41-
// val promise = Promise[Evt]()
42-
// actor ! Command.AskStream(command, StreamReplyRegistration(terminator, includeTerminator, promise))
43-
// promise.future
44-
// }
40+
def sendStream(command: Cmd, source: Process[Future, Evt]): Task[Evt] = Task {
41+
val promise = Promise[Evt]()
42+
actor ! Command.SendStream(command, source, ReplyRegistration(promise))
43+
promise.future
44+
}
45+
46+
def conversate(command: Cmd, source: Process[Future, Evt], terminator: Evt Boolean, includeTerminator: Boolean): Task[Process[Future, Evt]] = Task {
47+
val promise = Promise[Process[Future, Evt]]()
48+
actor ! Command.Conversate(command, source, StreamReplyRegistration(terminator, includeTerminator, promise))
49+
promise.future
50+
}
4551
}
4652

4753
object Client {

0 commit comments

Comments
 (0)