Skip to content

Commit 2aa32bc

Browse files
committed
Small changes, update README
1 parent 97fb8ba commit 2aa32bc

File tree

6 files changed

+134
-46
lines changed

6 files changed

+134
-46
lines changed

README.md

+129-41
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
## Overview
66

7-
**Sentinel** is boilerplate for TCP based servers and clients through Akka IO (2.2).
7+
**Sentinel** is boilerplate for TCP based servers and clients through Akka IO (2.3).
88

99
The implementation focusses on raw performance, using pipelines through multiple sockets represented by multiple workers (both client / server side). Sentinel is designed for usage in persistent connection environments, making it (currently) less suited for things like HTTP and best suited for DB clients / RPC stacks.
1010

11+
Sentinel brings a unique symmetrical design through *Antennas*, resulting in the same request and response handling on both clients and servers. This not only makes it simple to share code on both sides, but also opens the possibility to inverse request & response flow from server to client.
12+
1113
In its current state, it's being used internally as a platform to test performance strategies for CPU and IO bound services. In the nearby future, Sentinel will fuel both [Raiku](http://github.com/gideondk/raiku) as other soon-to-be-released Akka based libraries.
1214

1315

@@ -20,18 +22,20 @@ In overall, treat Sentinel as pre-release alpha software.
2022

2123
* Easy creation of reactive TCP servers / clients;
2224
* Easy initialization of servers and clients for default or custom router worker strategies;
23-
* Supervision (and restart / reconnection functionality) on both server and client for a defined number of workers;
25+
* Supervision (and restart / reconnection functionality) on clients for a defined number of workers;
2426
* Default implementation for flow control;
2527
* Sequencing and continuing multiple client operations using `Tasks`;
2628
* Handling of read / write interests;
2729
* Pluggable response handlers;
28-
* Streaming Play Enumerator based pipeline.
30+
* Streaming requests and responses (currently) based on Play Iteratees;
31+
* Direct server to client communication through a symmetrical design.
2932

3033
The following is currently missing in Sentinel, but will be added soon:
3134

35+
* Replacement of `Iteratees` in favour of the upcoming *Akka Streams*;
3236
* A far more solid test suite;
3337
* Better error handling and recovery;
34-
* Server to client communication;
38+
* Default functionality for callback based protocols;
3539
* More examples, and overall awesomeness…
3640

3741
## Installation
@@ -47,15 +51,56 @@ Or by adding the repo:
4751
to your SBT configuration and adding the `SNAPSHOT` to your library dependencies:
4852

4953
<notextile><pre><code>libraryDependencies ++= Seq(
50-
"nl.gideondk" %% "sentinel" % "0.5.5"
54+
"nl.gideondk" %% "sentinel" % "0.6.0-M1"
5155
)
5256
</code></pre></notextile>
5357

54-
## Usage
58+
## Architecture
59+
60+
The internal structure of Sentinel relies on a *Antenna* actor. The Antenna represents the connection between a client and a server and handles both the outgoing commands as incoming replies and handles the events received from the underlying *TCP* actors.
61+
62+
Within the antenna structure, two child actors are defined. One used for consuming replies from the connected host and one for the production of values for the connected host.
63+
64+
Both clients as servers share the same antenna construction, which results in a symmetrical design for sending and receiving commands. When a message is received from the opposing host, a *resolver* is used to determine the action or reaction on the received event. Based on the used protocol (as defined in the underlying protocol pipeline), a host can process the event and decide whether the consume the received event or to respond with new values (as in a normal request -> response way).
65+
66+
Once, for instance, a command is sent to a client (for a response from the connected server), the payload is sent to the opposing host and a reply-registration is set within the consumer part of the antenna. This registration and accompanying promise is completed with the consequential response from the server.
67+
68+
### Actions
69+
The handle incoming events, multiple actions are defined which can be used to implement logic on top of the used protocol. Actions are split into consumer actions and producers actions, which make a antenna able to:
70+
71+
#### Consumer Actions
72+
`AcceptSignal`: Accept and consume a incoming signal and apply it on a pending registration
73+
74+
`AcceptError`: Accept a incoming error message and apply it as a failure on a pending registration
75+
76+
`ConsumeStreamChunk`: Accept a incoming stream chunk and consume add it to the current running stream / Enumerator
77+
78+
`EndStream`: Accept a incoming stream terminator and end the current ongoing stream
79+
80+
`ConsumeChunkAndEndStream`: Consumes the chunk and terminates the stream (combination of the two above)
81+
82+
`Ignore`: Ignores the current received signal
83+
84+
#### Producer Actions
85+
`Signal`: Responds to the incoming signal with a new (async) signal
86+
87+
`CosumeStream`: Starts consuming the stream until a `EndStream` is received
88+
89+
`ProduceStream`: Produces a stream (Enumerator) for the requesting hosts
90+
91+
### Synchronicity
92+
Normally, Sentinel clients connect to servers through multiple sockets to increase parallel performance on top of the synchronous nature of *TCP* sockets. Producers and consumers implement a state machine to correctly respond to running incoming and outgoing streams, handling messages which don't impose treats to the message flow and stashing messages which could leak into the running streams.
93+
94+
Because of the synchronous nature of the underlying semantics, you have to handle each receiving signal in a appropriate way. Not handling all signals correctly could result in values ending up in incorrect registrations etc.
95+
96+
97+
## Initialization
5598
### Pipelines
56-
The new pipeline implementation in Akka IO, focusses on the definition of pipes for both incoming as outgoing messages. In these pipelines, a definition is made how incoming or outgoing messages are parsed and formatted.
99+
The Pipeline implementation available in Akka 2.2 is becoming obsolete in Akka 2.3 to be replaced with a (better) alternative later on in Akka 2.4. As it seemed that pipelines aren't the best solution for Akka, this currently leaves Akka 2.3 without a reactive *protocol layer*. To bridge the period until a definite solution is available, the "older" pipeline implementation is packaged along with Sentinel.
100+
101+
The pipeline implementation focusses on the definition of pipes for both incoming as outgoing messages. In these pipelines, a definition is made how incoming or outgoing messages are parsed and formatted.
57102

58-
Each of these *stages* can easily be composed into a bigger stage (`A => B >> B => C`) taking a the input of the first stage and outputting the format of the last stage. Within Sentinel, the eventual output send to the IO workers is in the standard `ByteString` format, making it nessecary that the end stage of the pipeline always outputs content of the `ByteString` type:
103+
Each of these *stages* can easily be composed into a bigger stage (`A => B >> B => C`) taking a the input of the first stage and outputting the format of the last stage. Within Sentinel, the eventual output send to the IO workers is in the standard `ByteString` format, making it necessary that the end stage of the pipeline always outputs content of the `ByteString` type:
59104

60105
```scala
61106
case class PingPongMessageFormat(s: String)
@@ -77,46 +122,77 @@ class PingPongMessageStage extends SymmetricPipelineStage[PipelineContext,
77122
}
78123
```
79124

80-
### Client
81-
After the definition of the pipeline, a client is easily created:
125+
### Resolver
126+
The default resolver for a client is one that automatically accepts all signals. This default behaviour makes it able to handle basic protocols asynchronously without defining a custom resolver on the client side.
82127

83-
```scala
84-
SentinelClient.randomRouting("localhost", 9999, 4, "Ping Client")(stages)
85-
```
128+
It's easy to extend the behaviour on the client side for receiving stream responses by defining a custom `SentinelResolver`:
86129

87-
Defining the host and port where the client should connect to, the amount of workers used to handle commands / events, description of the client and the earlier defined context and stages (for the complete list of parameters, check the code for the moment).
88-
89-
You can use the `randomRouting` / `roundRobinRouting` methods depending on the routing strategy you want to use to communicate to the workers. For a more custom approach the `apply` method is available, which lets you define a router strategy yourself.
130+
```scala
131+
import SimpleMessage._
132+
trait DefaultSimpleMessageHandler extends SentinelResolver[SimpleMessageFormat, SimpleMessageFormat] {
133+
def process = {
134+
case SimpleStreamChunk(x) if (x.length > 0) ConsumerAction.ConsumeStreamChunk else ConsumerAction.EndStream
135+
case x: SimpleError ConsumerAction.AcceptError
136+
case x: SimpleReply ConsumerAction.AcceptSignal
137+
}
138+
}
90139

91-
### Server
92-
Handlers of requests are pluggable by defining a *requestHandler*. The request handler is defined using a function: `Init[WithinActorContext, Cmd, Evt] ⇒ ActorRef`, taking a `TcpPipelineHandler.Init` type and returning a new actor handling the `Init.Event` types from the TcpPipelineHandler and returning the appriopriate `Init.Command` types back to the TcpPipelienHandler
140+
object SimpleClientHandler extends DefaultSimpleMessageHandler
141+
```
93142

94-
#### Async handler
95-
By default a *async* handler is supplied with a easy to use interface. The async handler takes a `handle` function as argument, which it uses to handle incoming events from a client. The handle function is of type `Evt => Future[Cmd]`, taking the parsed result from the incoming pipe and preparing the response send back to the client.
143+
In a traditional structure, a different resolver should be used on the server side, handling incoming requests and responding with the appropriate response. The partial function taking a event and resulting in a action fully exposes the event to *route* the messages to the current action:
96144

97145
```scala
98-
def handle(event: PingPongMessageFormat): Future[PingPongMessageFormat] = {
99-
event.s match {
100-
case "PING" Future(PingPongMessageFormat("PONG"))
101-
case _ Future.failed(new Exception("Unknown command"))
146+
object SimpleServerHandler extends DefaultSimpleMessageHandler {
147+
148+
override def process = super.process orElse {
149+
case SimpleCommand(PING_PONG, payload) ProducerAction.Signal { x: SimpleCommand Future(SimpleReply("PONG")) }
150+
case SimpleCommand(TOTAL_CHUNK_SIZE, payload) ProducerAction.ConsumeStream { x: SimpleCommand
151+
s: Enumerator[SimpleStreamChunk]
152+
s |>>> Iteratee.fold(0) { (b, a) b + a.payload.length } map (x SimpleReply(x.toString))
153+
}
154+
case SimpleCommand(GENERATE_NUMBERS, payload) ProducerAction.ProduceStream { x: SimpleCommand
155+
val count = payload.toInt
156+
Future((Enumerator(List.range(0, count): _*) &> Enumeratee.map(x SimpleStreamChunk(x.toString))) >>> Enumerator(SimpleStreamChunk("")))
102157
}
158+
case SimpleCommand(ECHO, payload) ProducerAction.Signal { x: SimpleCommand Future(SimpleReply(x.payload)) }
159+
}
103160
}
104161
```
105162

106-
The return type of `Cmd` should be wrapped into a `Future`, this makes it able to do other non-blocking work within, for instance, IO focused services.
163+
Like illustrated, the `ProducerAction.Signal` producer action makes it able to respond with a Async response. Taking a function which handles the incoming event and producing a new value, wrapped in a `Future`.
164+
165+
`ProducerAction.ConsumeStream` takes a function handling the incoming event and the Enumerator with the consequential chunks, resulting in a new value wrapped in a `Future`
107166

108-
After the definition of the handler, the server can be defined in same fashion as the client:
167+
`ProducerAction.ProduceStream` takes a function handling the incoming event and returning a corresponding stream as a `Enumerator` wrapped in a `Future`
168+
169+
### Client
170+
After the definition of the pipeline, a client is easily created:
109171

110172
```scala
111-
SentinelServer.async(9999, PingPongServerHandler.handle, "Ping Server")(stages)
173+
SentinelClient.randomRouting("localhost", 9999, 4, "Ping Client", stages = stages, resolver = resolver)
112174
```
113175

114-
### Client usage
176+
Defining the host and port where the client should connect to, the amount of workers used to handle commands / events, description of the client and the earlier defined context, stages and resolver (for the complete list of parameters, check the code for the moment).
177+
178+
You can use the `randomRouting` / `roundRobinRouting` methods depending on the routing strategy you want to use to communicate to the workers. For a more custom approach the `apply` method is available, which lets you define a router strategy yourself.
115179

116-
Once a client and / or server has been set up, the `<~<` method can be used on the client to send a command to the connected server. Results are wrapped into a `Task` containing the type `Evt` defined in the incoming stage of the client.
180+
### Server
181+
When the stages and resolver are defined, creation of a server is very straight forward:
182+
183+
``scala
184+
SentinelServer(portNumber, SimpleServerHandler, "Server", SimpleMessage.stages)
185+
```
186+
187+
This will automatically start the server with the corresponding stages and handler, in the future, separate functionality for starting, restarting and stopping services will be available.
188+
189+
## Usage
190+
### Client
191+
192+
Once a client and / or server has been set up, the `?` method can be used on the client to send a command to the connected server. Results are wrapped into a `Task` containing the type `Evt` defined in the incoming stage of the client.
117193
118194
```scala
119-
PingPongTestHelper.pingClient <~< PingPongMessageFormat("PING")
195+
PingPongTestHelper.pingClient ? PingPongMessageFormat("PING")
120196
res0: Task[PingPongMessageFormat]
121197
```
122198

@@ -127,29 +203,41 @@ Use `run` to expose the Future, or use `start(d: Duration)` to perform IO and wa
127203
This bare bone approach to sending / receiving messages is focussed on the idea that a higher-level API on top of Sentinel is responsible to make client usage more comfortable.
128204

129205
### Streamed requests / responses
130-
#### Clients
131-
It's possible to stream content towards Sentinel clients by using the the `|~>>>` or `|>>>` functions on a Play *Enumerator* (after importing `nl.sentinel.client._`)
206+
#### Sending
207+
It's possible to stream content towards Sentinel clients by using the the `?<<-` command, expecting the command to be send to the server, accompanied by the actual stream:
132208

133209
```scala
134-
Enumerator(chunks) |~>>> client
135-
res0: Task[TotalSize]
210+
c ?<<- (SimpleCommand(TOTAL_CHUNK_SIZE, ""), Enumerator(chunks: _*))
211+
res0: Task[SimpleCommand]
212+
213+
c ?<<- Enumerator((SimpleCommand(TOTAL_CHUNK_SIZE, "") ++ chunks): _*)
214+
res1: Task[SimpleCommand]
136215

137-
Enumerator(chunks) |>>> client
138-
res1: Future[Try[TotalSize]]
139216
```
140217

141218
The content within the *Enumerator* is folded to send each item to the TCP connection (returning in the `Evt` type, defined through the pipeline).
142219

143-
#### Streaming Pipelines
144-
To handle incoming streams, a `EnumeratorStage` is available in Sentinel. Initialisation of `EnumeratorStage` takes two arguments: `terminator: Evt ⇒ Boolean` and `includeTerminator: Boolean = false`.
220+
#### Receiving
221+
In the same manner, a stream can be requested from the server:
222+
223+
```scala
224+
c ?->> SimpleCommand(GENERATE_NUMBERS, count.toString)
225+
res0: Task[Enumerator[SimpleCommand]]
226+
```
227+
228+
### Server
229+
Although functionality will be expanded in the future, it's currently also possible to send requests from the server to the connected clients. This can be used for retrieval of client information on servers request, but could also be used as a retrieval pattern where clients are dormant after request, but respond to requests when necessary (retrieving sensor info per example).
145230

146-
This first argument is a function taking each `Evt` and returning a boolean in case the streamed chunk can be treated as a `EOS`. The second argument is used to declare if the terminator should be included in the eventual stream of content.
231+
The following commands can be used to retrieve information:
147232

148-
A simple example can be found in the [Iteratee Spec](https://github.com/gideondk/sentinel/blob/master/src/test/scala/nl/gideondk/sentinel/IterateeSpec.scala).
233+
`?`: Sends command to *one* (randomly chosen) connected socket for a answer, resulting in one event.
234+
`?*`: Sends a command to all connected hosts, resulting in a list of events from each host individually.
235+
`?**`: Sends a command to all connected sockets, resulting in a list of events from all connected sockets.
149236

237+
Simple server metrics are available through the `connectedSockets` and `connectedHosts` commands, returning a `Task[Int]` containing the corresponding count.
150238

151239
# License
152-
Copyright © 2013 Gideon de Kok
240+
Copyright © 2014 Gideon de Kok
153241

154242
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
155243

src/main/scala/nl/gideondk/sentinel/Server.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class ServerCore[Cmd, Evt](port: Int, description: String, stages: ⇒ PipelineS
138138
}
139139

140140
object SentinelServer {
141-
def apply[Evt, Cmd](serverPort: Int, resolver: SentinelResolver[Evt, Cmd], description: String = "Sentinel Server")(stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], lowBytes: Long = 100L, highBytes: Long = 50 * 1024L, maxBufferSize: Long = 1000L * 1024L)(implicit system: ActorSystem) = {
141+
def apply[Evt, Cmd](serverPort: Int, resolver: SentinelResolver[Evt, Cmd], description: String = "Sentinel Server", stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString], lowBytes: Long = 100L, highBytes: Long = 50 * 1024L, maxBufferSize: Long = 1000L * 1024L)(implicit system: ActorSystem) = {
142142
new Server[Evt, Cmd] {
143143
val actor = system.actorOf(Props(new ServerCore(serverPort, description, stages, resolver)(lowBytes, highBytes, maxBufferSize)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"), name = "sentinel-server-" + java.util.UUID.randomUUID.toString)
144144
}

src/test/scala/nl/gideondk/sentinel/FullDuplexSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class FullDuplexSpec extends WordSpec with ShouldMatchers {
2323
def client(portNumber: Int)(implicit system: ActorSystem) = Client.randomRouting("localhost", portNumber, 1, "Worker", SimpleMessage.stages, 5 seconds, SimpleServerHandler)(system)
2424

2525
def server(portNumber: Int)(implicit system: ActorSystem) = {
26-
val s = SentinelServer(portNumber, SimpleServerHandler)(SimpleMessage.stages)(system)
26+
val s = SentinelServer(portNumber, SimpleServerHandler, stages = SimpleMessage.stages)(system)
2727
Thread.sleep(100)
2828
s
2929
}

src/test/scala/nl/gideondk/sentinel/RequestResponse.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RequestResponseSpec extends WordSpec with Matchers {
2626
def client(portNumber: Int)(implicit system: ActorSystem) = Client.randomRouting("localhost", portNumber, 16, "Worker", SimpleMessage.stages, 5 seconds, SimpleServerHandler)(system)
2727

2828
def server(portNumber: Int)(implicit system: ActorSystem) = {
29-
val s = SentinelServer(portNumber, SimpleServerHandler)(SimpleMessage.stages)(system)
29+
val s = SentinelServer(portNumber, SimpleServerHandler,stages = SimpleMessage.stages)(system)
3030
Thread.sleep(100)
3131
s
3232
}

src/test/scala/nl/gideondk/sentinel/ServerRequestSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class ServerRequestSpec extends WordSpec with ShouldMatchers {
2727
def client(portNumber: Int)(implicit system: ActorSystem) = Client.randomRouting("localhost", portNumber, numberOfConnections, "Worker", SimpleMessage.stages, 5 seconds, SimpleServerHandler)(system)
2828

2929
def server(portNumber: Int)(implicit system: ActorSystem) = {
30-
val s = SentinelServer(portNumber, SimpleServerHandler)(SimpleMessage.stages)(system)
30+
val s = SentinelServer(portNumber, SimpleServerHandler, stages = SimpleMessage.stages)(system)
3131
Thread.sleep(100)
3232
s
3333
}

src/test/scala/nl/gideondk/sentinel/StreamingSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class StreamingSpec extends WordSpec with ShouldMatchers {
2626
def client(portNumber: Int)(implicit system: ActorSystem) = Client.randomRouting("localhost", portNumber, 2, "Worker", SimpleMessage.stages, 5 seconds, SimpleServerHandler)(system)
2727

2828
def server(portNumber: Int)(implicit system: ActorSystem) = {
29-
val s = SentinelServer(portNumber, SimpleServerHandler)(SimpleMessage.stages)(system)
29+
val s = SentinelServer(portNumber, SimpleServerHandler, stages = SimpleMessage.stages)(system)
3030
Thread.sleep(100)
3131
s
3232
}

0 commit comments

Comments
 (0)