Skip to content

Commit c5fcb84

Browse files
committed
make specs2 work
1 parent cc367a1 commit c5fcb84

File tree

3 files changed

+13
-8
lines changed

3 files changed

+13
-8
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaBolt.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class KafkaBolt(conf: Configs) extends TaskActor(conf) {
6969

7070
private def reportThroughput : Unit = {
7171
val current = System.currentTimeMillis()
72-
LOG.info(s"Task $taskId Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
72+
LOG.info(s"Task $taskId; Actor ${self}; Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
7373
lastCount = count
7474
lastTime = current
7575
}

project/Build.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object Build extends sbt.Build {
2929
val spraySwaggerVersion = "0.4.3"
3030
val swaggerUiVersion = "2.0.21"
3131
val specs2Version = "1.13"
32+
val mockitoVersion = "1.9.5"
3233

3334
val commonSettings = Defaults.defaultSettings ++ packAutoSettings ++
3435
Seq(
@@ -97,7 +98,8 @@ object Build extends sbt.Build {
9798
Seq(
9899
libraryDependencies ++= Seq(
99100
"org.apache.kafka" %% "kafka" % kafkaVersion,
100-
"org.specs2" %% "specs2" % specs2Version % "test"
101+
"org.specs2" %% "specs2" % specs2Version % "test",
102+
"org.mockito" % "mockito-core" % mockitoVersion % "test"
101103
)
102104
)
103105
) dependsOn(core)

streaming/src/test/scala/org/apache/gearpump/streaming/transaction/OffsetManagerSpec.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,33 @@
1919
package org.apache.gearpump.streaming.transaction
2020

2121
import org.apache.gearpump.streaming.transaction.api.{OffsetManager, Checkpoint, Source}
22-
import org.apache.gearpump.streaming.transaction.kafka.{KafkaCheckpointManagerFactory, KafkaCheckpointManager, KafkaSource}
22+
import org.apache.gearpump.streaming.transaction.kafka._
2323
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
2424
import org.apache.gearpump.TimeStamp
2525
import org.apache.gearpump.util.Configs
2626
import org.specs2.mutable._
2727
import org.specs2.mock._
2828

2929

30-
object OffsetManagerSpec extends Specification with Mockito with MockitoMocker {
30+
class OffsetManagerSpec extends Specification with Mockito {
3131
"OffsetManager" should {
3232
"checkpoint updated timestamp and offsets for each source" in {
3333

3434
"Testing OffsetManager".txt
3535

36-
val kafkaConfig = mock[Map[String, _]]
3736
val checkpointManagerFactory = mock[KafkaCheckpointManagerFactory]
3837
val checkpointManager = mock[KafkaCheckpointManager]
38+
val filter = mock[RelaxedTimeFilter]
39+
val config = Map(
40+
CHECKPOINT_MANAGER_FACTORY_CLASS -> checkpointManagerFactory,
41+
CHECKPOINT_FILTER_CLASS -> filter
42+
)
3943

40-
kafkaConfig.getCheckpointManagerFactory returns checkpointManagerFactory
4144
checkpointManagerFactory.getCheckpointManager(any[Configs]) returns checkpointManager
42-
checkpointManager.writeCheckpoint(any[Source], any[Checkpoint]) returns doNothing
45+
checkpointManager.writeCheckpoint(any[Source], any[Checkpoint]) answers {args => }
4346

4447

45-
val conf = Configs(kafkaConfig)
48+
val conf = Configs(config)
4649
val offsetManager = new OffsetManager(conf)
4750

4851
val offsetsByTimeAndSource: Map[(Source, TimeStamp), Long] = Map(

0 commit comments

Comments
 (0)