Skip to content

Commit 56d75b0

Browse files
committed
Revised client API (for easier typed, asymmetric pipelines), Enumerator pimp for easier usage with Enumerators.
1 parent 42a337b commit 56d75b0

File tree

8 files changed

+53
-42
lines changed

8 files changed

+53
-42
lines changed

README.md

+8-4
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,19 @@ This bare bone approach to sending / receiving messages is focussed on the idea
126126

127127
### Streamed requests / responses (not final)
128128
#### Clients
129-
It's possible to stream content towards Sentinel clients by sending the `StreamedOperation` command to a Sentinel Client:
129+
It's possible to stream content towards Sentinel clients by using the the `|~>>>` or `|>>>` functions on a Play *Enumerator* (after importing `nl.sentinel.client._`)
130130

131131
```scala
132-
client.streamCommands[TotalSize, Chunk](Enumerator(chunks: _*))
132+
Enumerator(chunks) |~>>> client
133+
res0: Task[TotalSize]
134+
135+
Enumerator(chunks) |>>> client
136+
res1: Future[Try[TotalSize]]
133137
```
134138

135-
The function is annotated with the expected returning type (`TotalSize` in this case) and the type used within the `Enumerator` (`Chunk` in this case). The function takes a **Play 2** Enumerator as argument, which is folded to send each item to the TCP connection.
139+
The content within the *Enumerator* is folded to send each item to the TCP connection (returning in the `Evt` type, defined through the pipeline).
136140

137-
#### Servers
141+
#### Streaming Pipelines
138142
To handle incoming streams, a `EnumeratorStage` is available in Sentinel. Initialisation of `EnumeratorStage` takes two arguments: `terminator: Evt ⇒ Boolean` and `includeTerminator: Boolean = false`.
139143

140144
This first argument is a function taking each `Evt` and returning a boolean in case the streamed chunk can be treated as a `EOS`. The second argument is used to declare if the terminator should be included in the eventual stream of content.

src/main/scala/nl/gideondk/sentinel/client/Supervisor.scala

+30-7
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,35 @@ import akka.util.ByteString
99
import nl.gideondk.sentinel._
1010
import akka.routing.RandomRouter
1111
import akka.routing.RoundRobinRouter
12+
import scala.concurrent.Promise
13+
import play.api.libs.iteratee.Enumerator
1214

13-
class SentinelClient(address: InetSocketAddress, routerConfig: RouterConfig, description: String,
14-
reconnectDuration: FiniteDuration, worker: Actor) extends Actor with ActorLogging {
15+
trait SentinelClient[Cmd, Evt] {
16+
def actor: ActorRef
17+
18+
def <~<(command: Cmd): Task[Evt] = sendCommand(command)
19+
20+
def sendCommand(command: Cmd): Task[Evt] = Task {
21+
val promise = Promise[Evt]()
22+
actor ! Operation(command, promise)
23+
promise.future
24+
}
25+
26+
def streamCommands(stream: Enumerator[Cmd]): Task[Evt] = Task {
27+
val promise = Promise[Evt]()
28+
actor ! StreamedOperation(stream, promise)
29+
promise.future
30+
}
31+
}
32+
33+
class SentinelClientSupervisor(address: InetSocketAddress, routerConfig: RouterConfig, description: String,
34+
reconnectDuration: FiniteDuration, worker: Actor) extends Actor with ActorLogging {
1535
import context.dispatcher
16-
import SentinelClient._
36+
37+
private case object InitializeRouter
38+
private case object ReconnectRouter
39+
40+
case class NoConnectionException extends Throwable
1741

1842
var router: Option[ActorRef] = None
1943

@@ -60,14 +84,13 @@ class SentinelClient(address: InetSocketAddress, routerConfig: RouterConfig, des
6084
}
6185

6286
object SentinelClient {
63-
private case object InitializeRouter
64-
private case object ReconnectRouter
65-
6687
case class NoConnectionException extends Throwable
6788

6889
def apply[Cmd, Evt](serverHost: String, serverPort: Int, routerConfig: RouterConfig,
6990
description: String = "Sentinel Client", workerReconnectTime: FiniteDuration = 2 seconds)(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], lowBytes: Long = 1024L * 2L, highBytes: Long = 1024L * 1024L, maxBufferSize: Long = 1024L * 1024L * 50L)(implicit system: ActorSystem) = {
70-
system.actorOf(Props(new SentinelClient(new InetSocketAddress(serverHost, serverPort), routerConfig, description, workerReconnectTime, new SentinelClientWorker(stages, description + " Worker")(lowBytes, highBytes, maxBufferSize))))
91+
new SentinelClient[Cmd, Evt] {
92+
val actor = system.actorOf(Props(new SentinelClientSupervisor(new InetSocketAddress(serverHost, serverPort), routerConfig, description, workerReconnectTime, new SentinelClientWorker(stages, description + " Worker")(lowBytes, highBytes, maxBufferSize))))
93+
}
7194
}
7295

7396
def randomRouting[Cmd, Evt](serverHost: String, serverPort: Int, numberOfWorkers: Int, description: String = "Sentinel Client", workerReconnectTime: FiniteDuration = 2 seconds)(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], ackCount: Int = 10, maxBufferSize: Long = 1024L * 1024L * 50L)(implicit system: ActorSystem) =

src/main/scala/nl/gideondk/sentinel/client/package.scala

+6-17
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,14 @@ package nl.gideondk.sentinel
22

33
import scala.concurrent.Promise
44
import play.api.libs.iteratee._
5-
65
import akka.actor.{ ActorRef, actorRef2Scala }
6+
import scala.concurrent.Future
7+
import scala.util.Try
78

8-
final class AskableSentinelClient(val clientActorRef: ActorRef) extends AnyVal {
9-
def <~<[A](command: A): Task[A] = sendCommand[A, A](command)
10-
11-
def sendCommand[B, A](command: A): Task[B] = Task {
12-
val promise = Promise[B]()
13-
clientActorRef ! Operation(command, promise)
14-
promise.future
15-
}
9+
package object client {
10+
implicit class SentinelEnumerator[A](val e: Enumerator[A]) extends AnyVal {
11+
def |~>>>[B](i: SentinelClient[A, B]): Task[B] = i.streamCommands(e)
1612

17-
def streamCommands[B, A](stream: Enumerator[A]): Task[B] = Task {
18-
val promise = Promise[B]()
19-
clientActorRef ! StreamedOperation(stream, promise)
20-
promise.future
13+
def |>>>[B](i: SentinelClient[A, B]): Future[Try[B]] = i.streamCommands(e).start
2114
}
22-
}
23-
24-
package object client {
25-
implicit def commandable(actorRef: ActorRef): AskableSentinelClient = new AskableSentinelClient(actorRef)
2615
}

src/main/scala/nl/gideondk/sentinel/server/Supervisor.scala

-5
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,3 @@ object SentinelServer {
7171
system.actorOf(Props(new SentinelServer(serverPort, description, stages, requestHandler)(lowBytes, highBytes, maxBufferSize)))
7272
}
7373
}
74-
75-
private case object InitializeServerRouter
76-
private case object ReconnectServerRouter
77-
78-
case class NoWorkerException extends Throwable

src/test/scala/nl/gideondk/sentinel/IterateeSpec.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import akka.actor.ActorSystem
1010
import akka.io.{ LengthFieldFrame, PipePair, PipelineContext, PipelineStage }
1111
import akka.routing.RandomRouter
1212
import akka.util.{ ByteString, ByteStringBuilder }
13-
import client.{ SentinelClient, commandable }
13+
import client._
1414
import nl.gideondk.sentinel.pipelines.EnumeratorStage
1515
import play.api.libs.iteratee.{ Enumerator, Iteratee }
1616
import scalaz.Scalaz._
@@ -87,9 +87,9 @@ class ChunkUploadSpec extends Specification with ChunkUploadWorkers {
8787
val connNum = 4
8888

8989
val chunks = List.fill(num)(Chunk(false, LargerPayloadTestHelper.randomBSForSize((1024 * 1024 * 0.1).toInt).compact)) ++ List(Chunk(true, ByteString()))
90-
val resLength = client.streamCommands[TotalSize, Chunk](Enumerator(chunks: _*)).copoint.length
90+
val res = Enumerator(chunks: _*) |~>>> client
9191

92-
resLength == chunks.foldLeft(0)((t, c) t + c.chunk.length)
92+
res.copoint.length == chunks.foldLeft(0)((t, c) t + c.chunk.length)
9393
}
9494
}
9595

src/test/scala/nl/gideondk/sentinel/LargerPayloadSpec.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import akka.actor.ActorRef
1111
import akka.io.LengthFieldFrame
1212
import akka.routing.RandomRouter
1313
import akka.util.{ ByteString, ByteStringBuilder }
14-
import client.{ SentinelClient, commandable }
14+
import client._
1515
import scalaz.Scalaz._
1616
import server.SentinelServer
1717

@@ -21,7 +21,6 @@ object LargerPayloadServerHandler {
2121
def handle(event: ByteString): Future[ByteString] = {
2222
val bs = new ByteStringBuilder
2323
bs.putInt(event.length)(java.nio.ByteOrder.BIG_ENDIAN)
24-
2524
Future(bs.result)
2625
}
2726
}

src/test/scala/nl/gideondk/sentinel/PingPongSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import akka.actor.ActorRef
1212
import akka.io.{ LengthFieldFrame, PipelineContext, SymmetricPipePair, SymmetricPipelineStage }
1313
import akka.routing.RandomRouter
1414
import akka.util.ByteString
15-
import client.{ SentinelClient, commandable }
16-
import server.SentinelServer
15+
import server._
16+
import client._
1717

1818
import akka.actor.ActorSystem
1919

src/test/scala/nl/gideondk/sentinel/SequenceSpec.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import akka.actor.ActorRef
1111
import akka.io.{ LengthFieldFrame, PipelineContext, SymmetricPipePair, SymmetricPipelineStage }
1212
import akka.routing.RandomRouter
1313
import akka.util.{ ByteString, ByteStringBuilder }
14-
import client.{ SentinelClient, commandable }
15-
import server.SentinelServer
14+
15+
import client._
16+
import server._
1617

1718
import akka.actor.ActorSystem
1819

0 commit comments

Comments
 (0)