Skip to content

Commit 4a476b1

Browse files
committed
add serde and factory to key value store
1 parent 15e4b48 commit 4a476b1

File tree

9 files changed

+109
-46
lines changed

9 files changed

+109
-46
lines changed

conf/kafka.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@ kafka {
2727
message.delay.ms = 10
2828
id = 0
2929
}
30+
31+
storage {
32+
kv.store.factory = "org.apache.gearpump.streaming.transaction.storage.inmemory.InMemoryKeyValueStoreFactory"
33+
}
3034
}

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaBolt.scala

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,35 @@ import java.util.Properties
2222

2323
import akka.actor.Cancellable
2424
import kafka.producer.ProducerConfig
25-
import org.apache.gearpump.Message
25+
import org.apache.gearpump.streaming.transaction.storage.inmemory.InMemoryKeyValueStore
26+
import org.apache.gearpump.{Message, TimeStamp}
2627
import org.apache.gearpump.streaming.transaction.lib.kafka.KafkaConfig._
28+
import org.apache.gearpump.streaming.transaction.lib.kafka.KafkaUtil._
2729
import org.apache.gearpump.streaming.task.{TaskContext, TaskActor}
2830
import org.apache.gearpump.util.Configs
2931
import scala.concurrent.duration.FiniteDuration
3032
import java.util.concurrent.TimeUnit
3133

3234
import org.slf4j.{Logger, LoggerFactory}
33-
import org.apache.gearpump.streaming.transaction.storage.api.StorageManager
34-
import org.apache.gearpump.streaming.transaction.storage.InMemoryKeyValueStore
35+
import org.apache.gearpump.streaming.transaction.storage.api.{KeyValueSerDe, StorageManager}
3536
import org.apache.gearpump.streaming.transaction.checkpoint.api.CheckpointSerDe
3637

3738
object KafkaBolt {
3839

39-
class StringCheckpointSerDe(encoding: String) extends CheckpointSerDe[String, String] {
40-
override def fromValueBytes(bytes: Array[Byte]): String = {
41-
new String(bytes, encoding)
40+
class String2SerDe(encoding: String) extends KeyValueSerDe[String, String] {
41+
override def toBytes(kv: (String, String)): Array[Byte] = {
42+
val (key, value) = kv
43+
val keyBytes = intToByteArray(key.length) ++ key.getBytes(encoding)
44+
val valueBytes = intToByteArray(value.length) ++ value.getBytes(encoding)
45+
keyBytes ++ valueBytes
4246
}
4347

44-
override def toValueBytes(value: String): Array[Byte] = {
45-
value.getBytes(encoding)
46-
}
47-
48-
override def fromKeyBytes(bytes: Array[Byte]): String = {
49-
new String(bytes, encoding)
50-
}
51-
52-
override def toKeyBytes(key: String): Array[Byte] = {
53-
key.getBytes(encoding)
48+
override def fromBytes(bytes: Array[Byte]): (String, String) = {
49+
val keyLen = byteArrayToInt(bytes.take(4))
50+
val key = new String(bytes.drop(4).take(keyLen), encoding)
51+
val valLen = byteArrayToInt(bytes.drop(4 + keyLen).take(4))
52+
val value = new String(bytes.drop(4 + keyLen + 4).take(valLen))
53+
(key, value)
5454
}
5555
}
5656

@@ -65,10 +65,10 @@ class KafkaBolt(conf: Configs) extends TaskActor(conf) {
6565
private val topic = config.getProducerTopic
6666
private val kafkaProducer = config.getProducer[String, String]()
6767
private val storageManager = new StorageManager[String, String](
68-
s"${conf.appId}:${taskId}",
69-
new InMemoryKeyValueStore[String, String](),
70-
config.getCheckpointManagerFactory.getCheckpointManager[String, String](conf),
71-
new StringCheckpointSerDe("UTF8")
68+
s"${conf.appId}_${taskId}",
69+
config.getKeyValueStoreFactory.getKeyValueStore[String, String](conf),
70+
new String2SerDe("UTF8"),
71+
config.getCheckpointManagerFactory.getCheckpointManager[TimeStamp, (String, String)](conf)
7272
)
7373

7474

@@ -82,6 +82,7 @@ class KafkaBolt(conf: Configs) extends TaskActor(conf) {
8282
import context.dispatcher
8383
scheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
8484
new FiniteDuration(5, TimeUnit.SECONDS))(reportThroughput)
85+
storageManager.start()
8586
}
8687

8788
override def onNext(msg: Message): Unit = {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/lib/kafka/KafkaConfig.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import kafka.utils.ZKStringSerializer
2828
import org.I0Itec.zkclient.ZkClient
2929
import org.I0Itec.zkclient.serialize.ZkSerializer
3030
import org.apache.gearpump.streaming.transaction.checkpoint.TimeExtractor
31+
import org.apache.gearpump.streaming.transaction.storage.api.KeyValueStoreFactory
3132
import org.slf4j.{Logger, LoggerFactory}
3233

3334
import scala.collection.JavaConversions._
@@ -63,6 +64,9 @@ object KafkaConfig {
6364
// filtering config
6465
val CHECKPOINT_MESSAGE_DELAY_MS = "kafka.checkpoint.message.delay.ms"
6566

67+
// storage config
68+
val KV_STORE_FACTORY = "kafka.storage.kv.store.factory"
69+
6670
def apply(): Map[String, _] = new KafkaConfig().toMap
6771

6872
implicit class ConfigToKafka(config: Map[String, _]) {
@@ -199,6 +203,10 @@ object KafkaConfig {
199203
def getCheckpointId = {
200204
getInt(CHECKPOINT_ID)
201205
}
206+
207+
def getKeyValueStoreFactory = {
208+
getInstance[KeyValueStoreFactory](KV_STORE_FACTORY)
209+
}
202210
}
203211

204212
private val LOG: Logger = LoggerFactory.getLogger(classOf[KafkaConfig])

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/lib/kafka/KafkaUtil.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,25 @@ import kafka.utils.ZkUtils
2222
import org.apache.gearpump.streaming.transaction.lib.kafka.KafkaConsumer.Broker
2323
import org.I0Itec.zkclient.ZkClient
2424

25+
import java.nio.ByteBuffer
26+
2527
object KafkaUtil {
2628

2729
def longToByteArray(long: Long): Array[Byte] = {
28-
java.nio.ByteBuffer.allocate(8).putLong(long).array()
30+
ByteBuffer.allocate(8).putLong(long).array()
2931
}
3032

3133
def byteArrayToLong(bytes: Array[Byte]): Long = {
32-
java.nio.ByteBuffer.wrap(bytes).getLong
34+
ByteBuffer.wrap(bytes).getLong
35+
}
36+
37+
def intToByteArray(int: Int): Array[Byte] = {
38+
ByteBuffer.allocate(4).putInt(int).array()
3339
}
3440

41+
def byteArrayToInt(bytes: Array[Byte]): Int = {
42+
ByteBuffer.wrap(bytes).getInt
43+
}
3544

3645
def getBroker(zkClient: ZkClient, topic: String, partition: Int): Broker = {
3746
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/api/KeyValueStore.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.gearpump.streaming.transaction.storage.api
2020

21-
trait KeyValueStore[K, V] extends Store {
21+
trait KeyValueStore[K, V] {
2222
def get(key: K): Option[V]
2323

2424
def put(key: K, value: V): Option[V]
@@ -31,3 +31,8 @@ trait KeyValueStore[K, V] extends Store {
3131

3232
def close(): Unit
3333
}
34+
35+
trait KeyValueSerDe[K, V] {
36+
def toBytes(kv: (K, V)): Array[Byte]
37+
def fromBytes(bytes: Array[Byte]): (K, V)
38+
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/api/StoreFactory.scala renamed to streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/api/KeyValueStoreFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ package org.apache.gearpump.streaming.transaction.storage.api
2020

2121
import org.apache.gearpump.util.Configs
2222

23-
trait StoreFactory {
24-
def getStore(conf: Configs): Store
23+
trait KeyValueStoreFactory {
24+
def getKeyValueStore[K, V](conf: Configs): KeyValueStore[K, V]
2525
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/api/StorageManager.scala

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,61 @@
1818

1919
package org.apache.gearpump.streaming.transaction.storage.api
2020

21-
import org.apache.gearpump.streaming.transaction.checkpoint.api.{CheckpointSerDe, Checkpoint, CheckpointManager}
22-
import org.apache.gearpump.streaming.transaction.lib.kafka.KafkaSource
21+
import org.apache.gearpump.TimeStamp
22+
import org.apache.gearpump.streaming.transaction.checkpoint.api.{CheckpointSerDe, Checkpoint, CheckpointManager, Source}
23+
import org.apache.gearpump.streaming.transaction.lib.kafka.KafkaUtil._
24+
25+
object StorageManager {
26+
class StoreCheckpointSerDe[ K, V](keyValueSerDe: KeyValueSerDe[K, V])
27+
extends CheckpointSerDe[TimeStamp, (K, V)] {
28+
override def toKeyBytes(key: TimeStamp): Array[Byte] = {
29+
longToByteArray(key)
30+
}
31+
32+
override def toValueBytes(value: (K, V)): Array[Byte] = {
33+
keyValueSerDe.toBytes(value)
34+
}
35+
36+
override def fromKeyBytes(bytes: Array[Byte]): TimeStamp = {
37+
byteArrayToLong(bytes)
38+
}
39+
40+
override def fromValueBytes(bytes: Array[Byte]): (K, V) = {
41+
keyValueSerDe.fromBytes(bytes)
42+
}
43+
}
44+
}
2345

2446
class StorageManager[K, V](id: String,
2547
store: KeyValueStore[K, V],
26-
checkpointManager: CheckpointManager[K, V],
27-
checkpointSerDe: CheckpointSerDe[K, V])
48+
keyValueSerDe: KeyValueSerDe[K, V],
49+
checkpointManager: CheckpointManager[TimeStamp, (K, V)]
50+
)
2851
extends KeyValueStore[K, V] {
52+
import org.apache.gearpump.streaming.transaction.storage.api.StorageManager._
2953

30-
var states: Map[K, V] = Map.empty[K, V]
54+
private var states: Map[K, V] = Map.empty[K, V]
55+
private val source : Source = new Source {
56+
def name: String = id
57+
def partition: Int = 0
58+
}
59+
private val checkpointSerDe = new StoreCheckpointSerDe[K, V](keyValueSerDe)
60+
61+
def start(): Unit = {
62+
checkpointManager.register(Array(source))
63+
checkpointManager.start()
64+
}
3165

32-
def checkpoint: Checkpoint[K, V] = {
33-
val kafkaSource: KafkaSource = KafkaSource(getStorageTopic, 0)
34-
val checkpoint: Checkpoint[K, V] = Checkpoint(states.toList)
66+
def checkpoint(timestamp: TimeStamp): Checkpoint[TimeStamp, (K, V)] = {
67+
val checkpoint = Checkpoint(states.map(kv => (timestamp, kv)).toList)
3568
states = Map.empty[K, V]
36-
checkpointManager.writeCheckpoint(kafkaSource, checkpoint, checkpointSerDe)
69+
checkpointManager.writeCheckpoint(source, checkpoint, checkpointSerDe)
3770
checkpoint
3871
}
3972

40-
def restore: Checkpoint[K, V] = {
41-
val kafkaSource: KafkaSource = KafkaSource(getStorageTopic, 0)
42-
val checkpoint: Checkpoint[K, V] = checkpointManager.readCheckpoint(kafkaSource, checkpointSerDe)
43-
store.putAll(checkpoint.records)
73+
def restore(timestamp: TimeStamp): Checkpoint[TimeStamp, (K, V)] = {
74+
val checkpoint = checkpointManager.readCheckpoint(source, checkpointSerDe)
75+
store.putAll(checkpoint.records.filter(_._1 == timestamp).map(_._2))
4476
checkpoint
4577
}
4678

@@ -69,8 +101,4 @@ class StorageManager[K, V](id: String,
69101
override def get(key: K): Option[V] = {
70102
store.get(key)
71103
}
72-
73-
private def getStorageTopic: String = {
74-
s"storage_${id}"
75-
}
76104
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/InMemoryKeyValueStore.scala renamed to streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/inmemory/InMemoryKeyValueStore.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.gearpump.streaming.transaction.storage
19+
package org.apache.gearpump.streaming.transaction.storage.inmemory
2020

2121
import java.util
22+
2223
import org.apache.gearpump.streaming.transaction.storage.api.KeyValueStore
2324

2425
object InMemoryKeyValueStore {
2526

2627
}
2728

28-
class InMemoryKeyValueStore[K, V]() extends KeyValueStore[K, V] {
29+
class InMemoryKeyValueStore[K, V] extends KeyValueStore[K, V] {
2930

3031
private val store = new util.HashMap[K, V]
3132

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/api/Store.scala renamed to streaming/src/main/scala/org/apache/gearpump/streaming/transaction/storage/inmemory/InMemoryKeyValueStoreFactory.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.gearpump.streaming.transaction.storage.api
19+
package org.apache.gearpump.streaming.transaction.storage.inmemory
2020

21-
trait Store
21+
import org.apache.gearpump.streaming.transaction.storage.api.{KeyValueStore, KeyValueStoreFactory}
22+
import org.apache.gearpump.util.Configs
23+
24+
class InMemoryKeyValueStoreFactory extends KeyValueStoreFactory {
25+
override def getKeyValueStore[K, V](conf: Configs): KeyValueStore[K, V] = {
26+
new InMemoryKeyValueStore[K, V]
27+
}
28+
}

0 commit comments

Comments
 (0)