Skip to content

Commit d67cb63

Browse files
committed
Fix initial tests
1 parent 59e5003 commit d67cb63

File tree

12 files changed

+265
-111
lines changed

12 files changed

+265
-111
lines changed

project/Build.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ object ApplicationBuild extends Build {
55
override lazy val settings = super.settings ++
66
Seq(
77
name := "sentinel",
8-
version := "0.6.0-RC1",
8+
version := "0.6.0-M1",
99
organization := "nl.gideondk",
1010
scalaVersion := "2.10.2",
1111
parallelExecution in Test := false,

src/main/resources/application.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ akka {
55
//loglevel = DEBUG
66
io {
77
tcp {
8-
//trace-logging = on
8+
// trace-logging = on
99
}
1010
}
1111
}

src/main/scala/nl/gideondk/sentinel/Antenna.scala

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], Resolver: Sent
1717
val consumer = context.actorOf(Props(new Consumer(init)), name = "resolver")
1818
val producer = context.actorOf(Props(new Producer(init)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"), name = "producer")
1919

20+
context watch tcpHandler
2021
context watch producer
2122
context watch consumer
2223

src/main/scala/nl/gideondk/sentinel/Client.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class ClientCore[Cmd, Evt](routerConfig: RouterConfig, description: String, reco
123123
new ClientAntennaManager(address, stages, Resolver)
124124

125125
def routerProto(address: InetSocketAddress) =
126-
context.system.actorOf(Props(antennaManagerProto(address)).withRouter(routerConfig).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
126+
context.actorOf(Props(antennaManagerProto(address)).withRouter(routerConfig).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
127127

128128
override def preStart = {
129129
self ! InitializeRouter

src/main/scala/nl/gideondk/sentinel/Command.scala

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ trait Command[Cmd, Evt] {
2121

2222
trait ServerCommand[Cmd, Evt]
2323

24+
trait ServerMetric
25+
2426
trait Reply[Cmd]
2527

2628
object Command {
@@ -35,6 +37,13 @@ object Command {
3537

3638
object ServerCommand {
3739
case class AskAll[Cmd, Evt](payload: Cmd, promise: Promise[List[Evt]]) extends ServerCommand[Cmd, Evt]
40+
case class AskAllHosts[Cmd, Evt](payload: Cmd, promise: Promise[List[Evt]]) extends ServerCommand[Cmd, Evt]
41+
case class AskAny[Cmd, Evt](payload: Cmd, promise: Promise[Evt]) extends ServerCommand[Cmd, Evt]
42+
}
43+
44+
object ServerMetric {
45+
case object ConnectedSockets extends ServerMetric
46+
case object ConnectedHosts extends ServerMetric
3847
}
3948

4049
object Reply {

src/main/scala/nl/gideondk/sentinel/Server.scala

+67-10
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,47 @@ import java.net.InetSocketAddress
55
import akka.actor._
66
import akka.io._
77
import akka.io.Tcp._
8-
import akka.util.ByteString
8+
import akka.util.{ Timeout, ByteString }
99

10-
import nl.gideondk.sentinel._
11-
import nl.gideondk.sentinel.SentinelResolver
1210
import scala.concurrent.{ Future, Promise, ExecutionContext }
13-
import play.api.libs.iteratee.Enumerator
11+
import scala.util.Random
12+
13+
import akka.pattern.ask
1414

1515
trait Server[Cmd, Evt] {
1616
def actor: ActorRef
1717

18-
def ?*(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = askMany(command)
18+
def ?**(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = askAll(command)
19+
20+
def ?*(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = askAllHosts(command)
1921

20-
def askMany(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = Task {
22+
def ?(command: Cmd)(implicit context: ExecutionContext): Task[Evt] = askAny(command)
23+
24+
def askAll(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = Task {
2125
val promise = Promise[List[Evt]]()
2226
actor ! ServerCommand.AskAll(command, promise)
2327
promise.future
2428
}
29+
30+
def askAllHosts(command: Cmd)(implicit context: ExecutionContext): Task[List[Evt]] = Task {
31+
val promise = Promise[List[Evt]]()
32+
actor ! ServerCommand.AskAllHosts(command, promise)
33+
promise.future
34+
}
35+
36+
def askAny(command: Cmd)(implicit context: ExecutionContext): Task[Evt] = Task {
37+
val promise = Promise[Evt]()
38+
actor ! ServerCommand.AskAny(command, promise)
39+
promise.future
40+
}
41+
42+
def connectedSockets(implicit timeout: Timeout): Task[Int] = Task {
43+
(actor ? ServerMetric.ConnectedSockets).mapTo[Int]
44+
}
45+
46+
def connectedHosts(implicit timeout: Timeout): Task[Int] = Task {
47+
(actor ? ServerMetric.ConnectedHosts).mapTo[Int]
48+
}
2549
}
2650

2751
class ServerCore[Cmd, Evt](port: Int, description: String, stages: PipelineStage[PipelineContext, Cmd, ByteString, Evt, ByteString],
@@ -36,22 +60,52 @@ class ServerCore[Cmd, Evt](port: Int, description: String, stages: ⇒ PipelineS
3660
val tcp = akka.io.IO(Tcp)(context.system)
3761
val address = new InetSocketAddress(port)
3862

39-
var connections = List[ActorRef]()
63+
var connections = Map[String, List[ActorRef]]()
4064

4165
override def preStart = {
4266
tcp ! Bind(self, address)
4367
}
4468

4569
def receiveCommands: Receive = {
46-
case x: ServerCommand.AskAll[Cmd, Evt]
47-
val futures = Task.sequence(connections.map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
70+
case x: ServerCommand.AskAll[Cmd, Evt] if connections.values.toList.length > 0
71+
val futures = Task.sequence(connections.values.toList.flatten.map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
4872
case scala.util.Success(s) Future.successful(s)
4973
case scala.util.Failure(e) Future.failed(e)
5074
}
5175
x.promise.completeWith(futures)
76+
77+
case x: ServerCommand.AskAllHosts[Cmd, Evt] if connections.values.toList.length > 0
78+
val futures = Task.sequence(connections.values.toList.map(x Random.shuffle(x.toList).head).map(wrapAtenna).map(_ ? x.payload)).start.flatMap {
79+
case scala.util.Success(s) Future.successful(s)
80+
case scala.util.Failure(e) Future.failed(e)
81+
}
82+
x.promise.completeWith(futures)
83+
84+
case x: ServerCommand.AskAny[Cmd, Evt] if connections.values.toList.length > 0
85+
val future = (wrapAtenna(Random.shuffle(connections.values.toList.flatten).head) ? x.payload).start.flatMap {
86+
case scala.util.Success(s) Future.successful(s)
87+
case scala.util.Failure(e) Future.failed(e)
88+
}
89+
x.promise.completeWith(future)
90+
91+
case ServerMetric.ConnectedSockets
92+
sender ! connections.values.flatten.toList.length
93+
94+
case ServerMetric.ConnectedHosts
95+
sender ! connections.keys.toList.length
5296
}
5397

5498
def receive = receiveCommands orElse {
99+
case x: Terminated
100+
val antenna = x.getActor
101+
connections = connections.foldLeft(Map[String, List[ActorRef]]()) {
102+
case (c, i)
103+
i._2.contains(antenna) match {
104+
case true if (i._2.length == 1) c else c + (i._1 -> i._2.filter(_ != antenna))
105+
case false c + i
106+
}
107+
}
108+
55109
case Bound
56110
log.debug(description + " bound to " + address)
57111

@@ -71,7 +125,10 @@ class ServerCore[Cmd, Evt](port: Int, description: String, stages: ⇒ PipelineS
71125
val connection = sender
72126

73127
val antenna = context.actorOf(Props(new Antenna(init, resolver)).withDispatcher("nl.gideondk.sentinel.sentinel-dispatcher"))
74-
connections :+= antenna
128+
context.watch(antenna)
129+
130+
val currentAtennas = connections.get(remoteAddr.getHostName).getOrElse(List[ActorRef]())
131+
connections = connections + (remoteAddr.getHostName -> (currentAtennas ++ List(antenna)))
75132

76133
val tcpHandler = context.actorOf(TcpPipelineHandler.props(init, connection, antenna).withDeploy(Deploy.local))
77134

src/test/scala/nl/gideondk/sentinel/FullDuplexSpec.scala

+12-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class FullDuplexSpec extends WordSpec with ShouldMatchers {
2020

2121
implicit val duration = Duration(25, SECONDS)
2222

23-
def client(portNumber: Int)(implicit system: ActorSystem) = Client("localhost", portNumber, RandomRouter(16), "Worker", 5 seconds, SimpleMessage.stages, SimpleServerHandler)(system)
23+
def client(portNumber: Int)(implicit system: ActorSystem) = Client("localhost", portNumber, RandomRouter(1), "Worker", 5 seconds, SimpleMessage.stages, SimpleServerHandler)(system)
2424

2525
def server(portNumber: Int)(implicit system: ActorSystem) = {
2626
val s = SentinelServer(portNumber, SimpleServerHandler)(SimpleMessage.stages)(system)
@@ -34,13 +34,15 @@ class FullDuplexSpec extends WordSpec with ShouldMatchers {
3434
val s = server(portNumber)
3535
val c = client(portNumber)
3636

37-
val action = c ? SimpleCommand(PING_PONG_COMMAND, "")
38-
val serverAction = (s ?* SimpleCommand(PING_PONG_COMMAND, "")).map(_.head)
37+
val action = c ? SimpleCommand(PING_PONG, "")
38+
val serverAction = (s ?* SimpleCommand(PING_PONG, "")).map(_.head)
3939

4040
val responses = Task.sequence(List(action, serverAction))
4141

4242
val results = responses.run.toOption.get
43-
results.length == 2 && results.distinct.length == 1
43+
44+
results.length should equal(2)
45+
results.distinct.length should equal(1)
4446
}
4547

4648
"be able to exchange multiple requests simultaneously" in new TestKitSpec {
@@ -51,15 +53,17 @@ class FullDuplexSpec extends WordSpec with ShouldMatchers {
5153

5254
val numberOfRequests = 1000
5355

54-
val actions = Task.sequenceSuccesses(List.fill(numberOfRequests)(c ? SimpleCommand(PING_PONG_COMMAND, "")))
55-
val secActions = Task.sequenceSuccesses(List.fill(numberOfRequests)(secC ? SimpleCommand(PING_PONG_COMMAND, "")))
56-
val serverActions = Task.sequenceSuccesses(List.fill(numberOfRequests)((s ?* SimpleCommand(PING_PONG_COMMAND, ""))))
56+
val actions = Task.sequenceSuccesses(List.fill(numberOfRequests)(c ? SimpleCommand(PING_PONG, "")))
57+
val secActions = Task.sequenceSuccesses(List.fill(numberOfRequests)(secC ? SimpleCommand(PING_PONG, "")))
58+
val serverActions = Task.sequenceSuccesses(List.fill(numberOfRequests)((s ?** SimpleCommand(PING_PONG, ""))))
5759

5860
val combined = Task.sequence(List(actions, serverActions.map(_.flatten), secActions))
5961

6062
val results = combined.run.get
6163

62-
results(0).length == numberOfRequests && results(1).length == numberOfRequests * 2 && results(2).length == numberOfRequests
64+
results(0).length should equal(numberOfRequests)
65+
results(2).length should equal(numberOfRequests)
66+
results(1).length should equal(numberOfRequests * 2)
6367
}
6468
}
6569
}

src/test/scala/nl/gideondk/sentinel/RequestResponse.scala

+18-27
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package nl.gideondk.sentinel
33
import scala.concurrent.ExecutionContext.Implicits.global
44

55
import org.scalatest.WordSpec
6-
import org.scalatest.matchers.ShouldMatchers
6+
import org.scalatest.matchers.{ Matchers, ShouldMatchers }
77

88
import scalaz._
99
import Scalaz._
@@ -17,8 +17,7 @@ import play.api.libs.iteratee._
1717

1818
import protocols._
1919

20-
21-
class RequestResponseSpec extends WordSpec with ShouldMatchers {
20+
class RequestResponseSpec extends WordSpec with Matchers {
2221

2322
import SimpleMessage._
2423

@@ -38,46 +37,38 @@ class RequestResponseSpec extends WordSpec with ShouldMatchers {
3837
val s = server(portNumber)
3938
val c = client(portNumber)
4039

41-
val action = c ? SimpleCommand(PING_PONG_COMMAND, "")
42-
action.run.isSuccess
43-
}
44-
45-
"be able to send a stream to a server" in new TestKitSpec {
46-
val portNumber = TestHelpers.portNumber.getAndIncrement()
47-
val s = server(portNumber)
48-
val c = client(portNumber)
49-
50-
val count = 500
51-
val chunks = List.fill(count)(SimpleStreamChunk("ABCDEF")) ++ List(SimpleStreamChunk(""))
52-
val action = c ?<<-(SimpleCommand(TOTAL_CHUNK_SIZE, ""), Enumerator(chunks: _*))
40+
val action = c ? SimpleCommand(PING_PONG, "")
41+
val result = action.run
5342

54-
val localLength = chunks.foldLeft(0)((b, a) b + a.payload.length)
55-
action.run.isSuccess && action.run.toOption.get.payload.toInt == localLength
43+
result.isSuccess should equal(true)
5644
}
5745

58-
"be able to receive streams from a server" in new TestKitSpec {
46+
"be able to requests multiple requests from a server" in new TestKitSpec {
5947
val portNumber = TestHelpers.portNumber.getAndIncrement()
6048
val s = server(portNumber)
6149
val c = client(portNumber)
6250

63-
val count = 500
64-
val action = c ?->> SimpleCommand(GENERATE_NUMBERS, count.toString)
51+
val numberOfRequests = 20 * 1000
52+
53+
val action = Task.sequenceSuccesses(List.fill(numberOfRequests)(c ? SimpleCommand(PING_PONG, "")))
54+
val result = action.run
6555

66-
val stream = action.copoint
67-
val result = Await.result(stream |>>> Iteratee.getChunks, 5 seconds)
68-
result.length == count
56+
result.get.length should equal(numberOfRequests)
57+
result.isSuccess should equal(true)
6958
}
7059

71-
"be able to requests multiple requests from a server" in new TestKitSpec {
60+
"be able to receive responses in correct order" in new TestKitSpec {
7261
val portNumber = TestHelpers.portNumber.getAndIncrement()
7362
val s = server(portNumber)
7463
val c = client(portNumber)
7564

7665
val numberOfRequests = 20 * 1000
7766

78-
val action = Task.sequenceSuccesses(List.fill(numberOfRequests)(c ? SimpleCommand(PING_PONG_COMMAND, "")))
79-
val result = action.run
80-
result.isSuccess && result.get.length == numberOfRequests
67+
val items = List.range(0, numberOfRequests).map(_.toString)
68+
val action = Task.sequenceSuccesses(items.map(x (c ? SimpleCommand(ECHO, x))))
69+
val result = action.run.get
70+
71+
result.map(_.payload) should equal(items)
8172
}
8273
}
8374
}

src/test/scala/nl/gideondk/sentinel/ResolverSpec.scala

-45
This file was deleted.

0 commit comments

Comments
 (0)