Skip to content

Commit 53cb862

Browse files
committed
Mid-refactor
1 parent 62b19a5 commit 53cb862

28 files changed

+1691
-982
lines changed

project/Build.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,20 @@ object ApplicationBuild extends Build {
2424
)
2525

2626
val appDependencies = Seq(
27-
"org.scalaz" %% "scalaz-core" % "7.0.0",
28-
"org.scalaz" %% "scalaz-effect" % "7.0.0",
29-
"org.specs2" %% "specs2" % "1.13",
27+
"org.scalaz" %% "scalaz-core" % "7.0.3",
28+
"org.scalaz" %% "scalaz-effect" % "7.0.3",
29+
"org.scalaz.stream" %% "scalaz-stream" % "0.2-SNAPSHOT",
30+
31+
"org.scalatest" % "scalatest_2.10" % "1.9.1" % "test",
3032

3133
"com.typesafe.play" %% "play-iteratees" % "2.2-akka22-SNAPSHOT",
3234

33-
"com.typesafe.akka" % "akka-actor_2.10" % "2.2.0"
35+
"com.typesafe.akka" %% "akka-actor" % "2.2.0",
36+
"com.typesafe.akka" %% "akka-testkit" % "2.2.0",
37+
38+
"org.typelevel" %% "scalaz-contrib-210" % "0.1.5",
39+
40+
"com.chuusai" % "shapeless" % "2.0.0-M1" cross CrossVersion.full
3441
)
3542

3643
lazy val root = Project(id = "sentinel",

project/plugins.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ resolvers ++= Seq(
55

66
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1")
77

8-
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.0-SNAPSHOT")
8+
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0-SNAPSHOT")
99

1010
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
1111

12-
addSbtPlugin("org.ensime" % "ensime-sbt-cmd" % "0.1.1")
12+
addSbtPlugin("org.ensime" % "ensime-sbt-cmd" % "0.1.1")

src/main/resources/application.conf

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
11
akka.log-dead-letters-during-shutdown = off
2-
akka.log-dead-letters = off
2+
akka.log-dead-letters = off
3+
4+
akka {
5+
//loglevel = DEBUG
6+
io {
7+
tcp {
8+
//trace-logging = on
9+
}
10+
}
11+
}

src/main/resources/reference.conf

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ nl {
44
sentinel-dispatcher {
55
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
66
}
7+
sentinel-antenna-dispatcher {
8+
mailbox-type = "nl.gideondk.sentinel.AntennaMailbox"
9+
}
10+
sentinel-consumer-dispatcher {
11+
mailbox-type = "nl.gideondk.sentinel.rx.ConsumerMailbox"
12+
}
713
}
814
}
915
}
10-
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
12+
import scala.util.Try
13+
import scala.concurrent.duration._
14+
import akka.pattern.ask
15+
import akka.util.Timeout
16+
17+
import scala.concurrent.Future
18+
import scalaz.contrib.std.scalaFuture._
19+
20+
trait Action
21+
22+
object Action {
23+
class EmptyStreamResultException extends Exception
24+
25+
case object Consume extends Action
26+
case object Ignore extends Action
27+
28+
trait Reaction[Evt, Cmd] extends Action
29+
30+
trait StreamReaction[Evt, Cmd] extends Reaction[Evt, Cmd] {
31+
def core: Process[Future, Cmd]
32+
}
33+
34+
case class Answer[Evt, Cmd](f: Future[Cmd]) extends Reaction[Evt, Cmd]
35+
case class ConsumeStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
36+
case class ProduceStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
37+
case class ReactToStream[Evt, Cmd](val core: Process[Future, Cmd]) extends StreamReaction[Evt, Cmd]
38+
39+
trait Decider[Evt, Cmd] {
40+
def answer(f: Future[Cmd]): Answer[Evt, Cmd] = Answer(f)
41+
42+
def produce(p: Process[Future, Cmd]): ProduceStream[Evt, Cmd] = ProduceStream(p)
43+
44+
def react(p: Process[Future, Cmd]): ReactToStream[Evt, Cmd] = ReactToStream(p)
45+
46+
def consumeStream(p: Process[Future, Cmd]): ConsumeStream[Evt, Cmd] = ConsumeStream(p)
47+
48+
def consume = Consume
49+
50+
def process: PartialFunction[Evt, Action]
51+
}
52+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
import scala.util.Try
12+
import scala.concurrent.duration._
13+
import akka.pattern.ask
14+
import akka.util.Timeout
15+
16+
import akka.actor.ActorSystem.Settings
17+
import com.typesafe.config.Config
18+
import akka.dispatch._
19+
20+
class AntennaMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
21+
PriorityGenerator {
22+
case x: akka.io.Tcp.Event 0
23+
case x: Management.ManagementMessage 1
24+
case x: Command[_] 2
25+
case _ 10
26+
})
27+
28+
class Antenna[Cmd, Evt](init: Init[WithinActorContext, Cmd, Evt], decider: Action.Decider[Evt, Cmd]) extends Actor with ActorLogging {
29+
var commandQueue = Queue.empty[Cmd]
30+
31+
def active(tcpHandler: ActorRef): Receive = {
32+
val answerer = context.actorOf(Props(new RxProcessors.Answerer(init)))
33+
val consumer = context.actorOf(Props(new RxProcessors.Consumer(init)))
34+
35+
context watch answerer
36+
context watch consumer
37+
38+
def handleTermination: Receive = {
39+
case x: Terminated context.stop(self)
40+
}
41+
42+
def highWaterMark: Receive = handleTermination orElse {
43+
case init.Command(data)
44+
commandQueue.enqueue(data)
45+
case BackpressureBuffer.LowWatermarkReached
46+
def dequeueAndSend: Unit = {
47+
if (!commandQueue.isEmpty) {
48+
val c = commandQueue.head
49+
commandQueue = commandQueue.tail
50+
self ! init.Command(c)
51+
52+
dequeueAndSend
53+
}
54+
}
55+
context.unbecome()
56+
}
57+
58+
handleTermination orElse {
59+
case x: Command.Ask[Cmd, Evt]
60+
consumer ! Management.RegisterReply(x.terminator, x.includeTerminator, x.pp)
61+
tcpHandler ! init.Command(x.payload)
62+
63+
case x: Command.Reply[Cmd]
64+
tcpHandler ! init.Command(x.payload)
65+
66+
case init.Event(data) {
67+
decider.process(data) match {
68+
case x: Action.Answer[Evt, Cmd] answerer ! x
69+
case Action.Consume consumer ! init.Event(data) // Pass through
70+
}
71+
}
72+
73+
case BackpressureBuffer.HighWatermarkReached {
74+
context.become(highWaterMark, false)
75+
}
76+
}
77+
}
78+
79+
def receive = {
80+
case Management.RegisterTcpHandler(tcpHandler)
81+
context.become(active(tcpHandler))
82+
}
83+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
12+
import scala.util.Try
13+
import scala.concurrent.duration._
14+
import akka.pattern.ask
15+
import akka.util.Timeout
16+
17+
import scala.concurrent.Future
18+
import scalaz.contrib.std.scalaFuture._
19+
20+
trait Command[A]
21+
22+
//trait RespondingCommand[A, B] extends Command[A] {
23+
// def promise:
24+
//}
25+
26+
object Command {
27+
case class Ask[Cmd, Evt](payload: Cmd, val pp: Promise[Evt]) extends Command[Cmd]
28+
case class AskStream[Cmd, Evt](payload: Cmd, terminator: Evt Boolean, includeTerminator: Boolean, val pp: Promise[Process[Future, Evt]]) extends Command[Cmd]
29+
//case class Ask[Cmd, Evt](payload: Cmd, terminator: Evt ⇒ Boolean, includeTerminator: Boolean, val pp: Promise[Process[Future, Evt]]) extends Command[Cmd]
30+
case class Reply[Cmd](payload: Cmd) extends Command[Cmd]
31+
32+
//case class Stream[O, T](source: Process[Task, O], promise: Promise[T]) extends Command
33+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package nl.gideondk.sentinel
2+
3+
import scala.collection.immutable.Queue
4+
import scala.concurrent.{ Future, Promise }
5+
import scala.util.{ Failure, Success }
6+
import akka.actor._
7+
import akka.io.BackpressureBuffer
8+
import akka.io.TcpPipelineHandler.{ Init, WithinActorContext }
9+
import scalaz.stream._
10+
import scalaz.stream.Process._
11+
12+
import scala.util.Try
13+
import scala.concurrent.duration._
14+
import akka.pattern.ask
15+
import akka.util.Timeout
16+
17+
import scala.concurrent.Future
18+
import scalaz.contrib.std.scalaFuture._
19+
20+
object Management {
21+
trait ManagementMessage
22+
case class RegisterTcpHandler(h: ActorRef) extends ManagementMessage
23+
24+
case class RegisterReply[A](terminator: A Boolean, includeTerminator: Boolean, promise: Promise[Process[Future, A]]) extends ManagementMessage
25+
case object ReplyRegistered extends ManagementMessage
26+
}
27+

src/main/scala/nl/gideondk/sentinel/Operation.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@ package nl.gideondk.sentinel
33
import scala.concurrent.Promise
44
import play.api.libs.iteratee._
55

6-
trait SentinelCommand
6+
import scalaz.stream._
7+
import scala.concurrent.Future
8+
import scalaz.contrib.std.scalaFuture._
79

8-
case class Operation[A, B](command: A, promise: Promise[B]) extends SentinelCommand
10+
trait SentinelCommand[T] {
11+
def promise: Promise[T]
12+
}
913

10-
case class StreamedOperation[A, B](stream: Enumerator[A], promise: Promise[B]) extends SentinelCommand
14+
case class Signal[C, T](command: C, promise: Promise[T]) extends SentinelCommand[T]
15+
16+
case class UpStreamOperation[O, T](source: Process[Future, O], promise: Promise[T]) extends SentinelCommand[T]
17+
18+
//case class DownStreamOperation[A, B](command: A, val promise: Enumerator[B], terminator: B ⇒ Boolean) extends SentinelCommand

0 commit comments

Comments
 (0)