Skip to content

Commit 42a337b

Browse files
committed
Updated README
1 parent 76497ed commit 42a337b

File tree

7 files changed

+35
-16
lines changed

7 files changed

+35
-16
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ In overall, treat Sentinel as pre-release alpha software.
2525
* Default implementation for flow control;
2626
* Sequencing and continuing multiple client operations using `Tasks`;
2727
* Handling of read / write interests;
28+
* Pluggable response handlers;
2829
* Streaming Play Enumerator based pipeline.
2930

3031
The following is currently missing in Sentinel, but will be added soon:
@@ -123,6 +124,24 @@ Use `run` to expose the Future, or use `start(d: Duration)` to perform IO and wa
123124

124125
This bare bone approach to sending / receiving messages is focussed on the idea that a higher-level API on top of Sentinel is responsible to make client usage more comfortable.
125126

127+
### Streamed requests / responses (not final)
128+
#### Clients
129+
It's possible to stream content towards Sentinel clients by sending the `StreamedOperation` command to a Sentinel Client:
130+
131+
```scala
132+
client.streamCommands[TotalSize, Chunk](Enumerator(chunks: _*))
133+
```
134+
135+
The function is annotated with the expected returning type (`TotalSize` in this case) and the type used within the `Enumerator` (`Chunk` in this case). The function takes a **Play 2** Enumerator as argument, which is folded to send each item to the TCP connection.
136+
137+
#### Servers
138+
To handle incoming streams, a `EnumeratorStage` is available in Sentinel. Initialisation of `EnumeratorStage` takes two arguments: `terminator: Evt ⇒ Boolean` and `includeTerminator: Boolean = false`.
139+
140+
This first argument is a function taking each `Evt` and returning a boolean in case the streamed chunk can be treated as a `EOS`. The second argument is used to declare if the terminator should be included in the eventual stream of content.
141+
142+
A simple example can be found in the [Iteratee Spec](https://github.com/gideondk/sentinel/blob/master/src/test/scala/nl/gideondk/sentinel/IterateeSpec.scala).
143+
144+
126145
# License
127146
Copyright © 2013 Gideon de Kok
128147

project/Build.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ object ApplicationBuild extends Build {
88
override lazy val settings = super.settings ++
99
Seq(
1010
name := "sentinel",
11-
version := "0.5.0",
11+
version := "0.5.0-SNAPSHOT",
1212
organization := "nl.gideondk",
1313
scalaVersion := "2.10.0",
1414
parallelExecution in Test := false,

src/main/scala/nl/gideondk/sentinel/client/Supervisor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ object SentinelClient {
6969
description: String = "Sentinel Client", workerReconnectTime: FiniteDuration = 2 seconds)(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], lowBytes: Long = 1024L * 2L, highBytes: Long = 1024L * 1024L, maxBufferSize: Long = 1024L * 1024L * 50L)(implicit system: ActorSystem) = {
7070
system.actorOf(Props(new SentinelClient(new InetSocketAddress(serverHost, serverPort), routerConfig, description, workerReconnectTime, new SentinelClientWorker(stages, description + " Worker")(lowBytes, highBytes, maxBufferSize))))
7171
}
72-
72+
7373
def randomRouting[Cmd, Evt](serverHost: String, serverPort: Int, numberOfWorkers: Int, description: String = "Sentinel Client", workerReconnectTime: FiniteDuration = 2 seconds)(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], ackCount: Int = 10, maxBufferSize: Long = 1024L * 1024L * 50L)(implicit system: ActorSystem) =
7474
apply[Cmd, Evt](serverHost, serverPort, RandomRouter(numberOfWorkers), description, workerReconnectTime)(stages, ackCount, maxBufferSize)
7575

src/main/scala/nl/gideondk/sentinel/pipelines/IterateePipeline.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package nl.gideondk.sentinel.pipelines
22

3-
import scala.concurrent.{ExecutionContext, Future, Promise}
4-
import scala.concurrent.duration.{DurationInt, FiniteDuration}
3+
import scala.concurrent.{ ExecutionContext, Future, Promise }
4+
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
55

6-
import akka.actor.{Actor, ActorSystem, Props, Stash, actorRef2Scala}
7-
import akka.io.{PipePair, PipelineContext, PipelineStage}
6+
import akka.actor.{ Actor, ActorSystem, Props, Stash, actorRef2Scala }
7+
import akka.io.{ PipePair, PipelineContext, PipelineStage }
88
import akka.pattern.ask
99
import akka.util.Timeout
1010
import play.api.libs.iteratee.Enumerator
@@ -26,10 +26,10 @@ class EnumeratorStage[Cmd <: AnyRef, Evt <: AnyRef](terminator: Evt ⇒ Boolean,
2626
channels.get(identifier) match {
2727
case Some(c)
2828
def addHookForIdentifier = {
29-
val p = Promise[Option[Evt]]()
30-
channels = channels ++ Map(identifier -> c.copy(hook = Some(p)))
31-
p
32-
}
29+
val p = Promise[Option[Evt]]()
30+
channels = channels ++ Map(identifier -> c.copy(hook = Some(p)))
31+
p
32+
}
3333

3434
val chunk = {
3535
if (c.queue.length == 0) addHookForIdentifier

src/main/scala/nl/gideondk/sentinel/server/Supervisor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ object SentinelServer {
6363
*/
6464

6565
def async[Evt, Cmd](serverPort: Int, handler: Evt Future[Cmd], description: String = "Sentinel Server")(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], lowBytes: Long = 1024L * 2L, highBytes: Long = 1024L * 1024L, maxBufferSize: Long = 1024L * 1024L * 50L)(implicit system: ActorSystem): ActorRef = {
66-
def newHandlerActor(init: Init[WithinActorContext, Cmd, Evt]) = system.actorOf(Props(new SentinelServerBasicAsyncHandler(init, handler)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
66+
def newHandlerActor(init: Init[WithinActorContext, Cmd, Evt]) = system.actorOf(Props(new SentinelServerBasicAsyncHandler(init, handler)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
6767
apply[Evt, Cmd](serverPort, handler, description)(stages, newHandlerActor, lowBytes, highBytes, maxBufferSize)(system)
6868
}
6969

src/test/scala/nl/gideondk/sentinel/IterateeSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import scala.concurrent.duration.Duration
77
import org.specs2.mutable.Specification
88

99
import akka.actor.ActorSystem
10-
import akka.io.{LengthFieldFrame, PipePair, PipelineContext, PipelineStage}
10+
import akka.io.{ LengthFieldFrame, PipePair, PipelineContext, PipelineStage }
1111
import akka.routing.RandomRouter
12-
import akka.util.{ByteString, ByteStringBuilder}
13-
import client.{SentinelClient, commandable}
12+
import akka.util.{ ByteString, ByteStringBuilder }
13+
import client.{ SentinelClient, commandable }
1414
import nl.gideondk.sentinel.pipelines.EnumeratorStage
15-
import play.api.libs.iteratee.{Enumerator, Iteratee}
15+
import play.api.libs.iteratee.{ Enumerator, Iteratee }
1616
import scalaz.Scalaz._
1717
import server.SentinelServer
1818

src/test/scala/nl/gideondk/sentinel/SequenceSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ trait SequenceWorkers {
5454
val stages = new SequenceMessageStage >> new LengthFieldFrame(1024 * 1024 * 10)
5555

5656
implicit val actorSystem = ActorSystem("test-system")
57-
57+
5858
val server = SentinelServer.async(8001, SequenceServerHandler.handle, "Ping Server")(stages)
5959
val client = SentinelClient("localhost", 8001, RandomRouter(4), "Ping Client")(stages)
6060
}

0 commit comments

Comments
 (0)