Skip to content

Commit 823b9a8

Browse files
authored
Merge pull request apache#38 from leonchen83/master
Add more redis event to downstream to rocketmq.
2 parents 1c327e0 + 6d29b9a commit 823b9a8

File tree

7 files changed

+80
-113
lines changed

7 files changed

+80
-113
lines changed

rocketmq-redis/README.md

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,52 @@ rocketmq 4.1.0 or higher
4949
Replicator replicator = new RocketMQRedisReplicator(configure);
5050
final RocketMQProducer producer = new RocketMQProducer(configure);
5151

52-
replicator.addRdbListener(new RdbListener.Adaptor() {
52+
replicator.addRdbListener(new RdbListener() {
53+
@Override public void preFullSync(Replicator replicator) {
54+
try {
55+
if (!producer.send(new PreFullSyncEvent())) {
56+
LOGGER.error("Fail to send PreFullSync event");
57+
}
58+
} catch (Exception e) {
59+
LOGGER.error("Fail to send PreFullSync event", e);
60+
}
61+
}
62+
63+
@Override public void auxField(Replicator replicator, AuxField auxField) {
64+
try {
65+
if (!producer.send(auxField)) {
66+
LOGGER.error("Fail to send AuxField[{}]", auxField);
67+
}
68+
} catch (Exception e) {
69+
LOGGER.error(String.format("Fail to send AuxField[%s]", auxField), e);
70+
}
71+
}
72+
5373
@Override public void handle(Replicator replicator, KeyValuePair<?> kv) {
5474
try {
55-
boolean success = producer.sendKeyValuePair(kv);
56-
if (!success) {
75+
if (!producer.send(kv)) {
5776
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
5877
}
5978
} catch (Exception e) {
6079
LOGGER.error(String.format("Fail to send KeyValuePair[key=%s]", kv.getKey()), e);
6180
}
6281
}
82+
83+
@Override public void postFullSync(Replicator replicator, long checksum) {
84+
try {
85+
if (!producer.send(new PostFullSyncEvent(checksum))) {
86+
LOGGER.error("Fail to send send PostFullSync event");
87+
}
88+
} catch (Exception e) {
89+
LOGGER.error("Fail to send PostFullSync event", e);
90+
}
91+
}
6392
});
6493

6594
replicator.addCommandListener(new CommandListener() {
6695
@Override public void handle(Replicator replicator, Command command) {
6796
try {
68-
boolean success = producer.sendCommand(command);
69-
if (!success) {
97+
if (!producer.send(command)) {
7098
LOGGER.error("Fail to send command[{}]", command);
7199
}
72100
} catch (Exception e) {
@@ -90,10 +118,8 @@ The config file located at target/rocketmq-redis-pack/conf/replicator.conf
90118
| parameter | default value| detail |
91119
|-----------|--------------|--------|
92120
| rocketmq.nameserver.address | 127.0.0.1:9876 | rocketmq server address|
93-
| rocketmq.producer.groupname | redis_replicator_producer_group_name | rocketmq group name |
94-
| rocketmq.producer.instancename | redis_replicator_producer_instance_name | rocketmq instance name |
121+
| rocketmq.producer.groupname | REDIS_REPLICATOR_PRODUCER_GROUP | rocketmq group name |
95122
| rocketmq.data.topic | redisdata | rocketmq topic name |
96-
| order.model | global | global or partial |
97123
| deploy.model | single | single or cluster |
98124
| zookeeper.address | 127.0.0.1:2181 | run on cluster model |
99125
| redis.uri | redis://127.0.0.1:6379 | the uri of redis master which replicate from |

rocketmq-redis/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
<maven.compiler.source>1.7</maven.compiler.source>
8080
<maven.compiler.target>1.7</maven.compiler.target>
8181
<!-- zookeeper -->
82-
<zookeeper.version>3.4.6</zookeeper.version>
8382
<curator.version>2.9.1</curator.version>
8483
<logback.version>1.1.3</logback.version>
8584
<rocketmq.client.version>4.1.0-incubating</rocketmq.client.version>

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RocketMQRedisReplicator.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@
2929
import org.apache.rocketmq.redis.replicator.cmd.CommandName;
3030
import org.apache.rocketmq.redis.replicator.cmd.CommandParser;
3131
import org.apache.rocketmq.redis.replicator.conf.Configure;
32+
import org.apache.rocketmq.redis.replicator.event.PostFullSyncEvent;
33+
import org.apache.rocketmq.redis.replicator.event.PreFullSyncEvent;
3234
import org.apache.rocketmq.redis.replicator.io.RawByteListener;
3335
import org.apache.rocketmq.redis.replicator.producer.RocketMQProducer;
3436
import org.apache.rocketmq.redis.replicator.rdb.RdbListener;
3537
import org.apache.rocketmq.redis.replicator.rdb.RdbVisitor;
38+
import org.apache.rocketmq.redis.replicator.rdb.datatype.AuxField;
3639
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
3740
import org.apache.rocketmq.redis.replicator.rdb.datatype.Module;
3841
import org.apache.rocketmq.redis.replicator.rdb.module.ModuleParser;
@@ -208,24 +211,52 @@ public static void main(String[] args) throws Exception {
208211
Replicator replicator = new RocketMQRedisReplicator(configure);
209212
final RocketMQProducer producer = new RocketMQProducer(configure);
210213

211-
replicator.addRdbListener(new RdbListener.Adaptor() {
214+
replicator.addRdbListener(new RdbListener() {
215+
@Override public void preFullSync(Replicator replicator) {
216+
try {
217+
if (!producer.send(new PreFullSyncEvent())) {
218+
LOGGER.error("Fail to send PreFullSync event");
219+
}
220+
} catch (Exception e) {
221+
LOGGER.error("Fail to send PreFullSync event", e);
222+
}
223+
}
224+
225+
@Override public void auxField(Replicator replicator, AuxField auxField) {
226+
try {
227+
if (!producer.send(auxField)) {
228+
LOGGER.error("Fail to send AuxField[{}]", auxField);
229+
}
230+
} catch (Exception e) {
231+
LOGGER.error(String.format("Fail to send AuxField[%s]", auxField), e);
232+
}
233+
}
234+
212235
@Override public void handle(Replicator replicator, KeyValuePair<?> kv) {
213236
try {
214-
boolean success = producer.sendKeyValuePair(kv);
215-
if (!success) {
237+
if (!producer.send(kv)) {
216238
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
217239
}
218240
} catch (Exception e) {
219241
LOGGER.error(String.format("Fail to send KeyValuePair[key=%s]", kv.getKey()), e);
220242
}
221243
}
244+
245+
@Override public void postFullSync(Replicator replicator, long checksum) {
246+
try {
247+
if (!producer.send(new PostFullSyncEvent(checksum))) {
248+
LOGGER.error("Fail to send send PostFullSync event");
249+
}
250+
} catch (Exception e) {
251+
LOGGER.error("Fail to send PostFullSync event", e);
252+
}
253+
}
222254
});
223255

224256
replicator.addCommandListener(new CommandListener() {
225257
@Override public void handle(Replicator replicator, Command command) {
226258
try {
227-
boolean success = producer.sendCommand(command);
228-
if (!success) {
259+
if (!producer.send(command)) {
229260
LOGGER.error("Fail to send command[{}]", command);
230261
}
231262
} catch (Exception e) {

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/conf/ReplicatorConstants.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,8 @@ public class ReplicatorConstants {
3333

3434
public static final String ROCKETMQ_PRODUCER_GROUP_NAME = "rocketmq.producer.groupname";
3535

36-
public static final String ROCKETMQ_PRODUCER_INSTANCE_NAME = "rocketmq.producer.instancename";
37-
3836
public static final String ROCKETMQ_DATA_TOPIC = "rocketmq.data.topic";
3937

40-
public static final String ORDER_MODEL = "order.model";
41-
42-
public static final String ORDER_MODEL_GLOBAL = "global";
43-
4438
public static final String DEPLOY_MODEL = "deploy.model";
4539

4640
public static final String DEPLOY_MODEL_CLUSTER = "cluster";

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/producer/RocketMQProducer.java

Lines changed: 6 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.rocketmq.redis.replicator.producer;
1919

20-
import java.util.List;
2120
import java.util.Objects;
2221
import org.apache.rocketmq.client.exception.MQBrokerException;
2322
import org.apache.rocketmq.client.exception.MQClientException;
@@ -26,114 +25,43 @@
2625
import org.apache.rocketmq.client.producer.SendResult;
2726
import org.apache.rocketmq.client.producer.SendStatus;
2827
import org.apache.rocketmq.common.message.Message;
29-
import org.apache.rocketmq.common.message.MessageQueue;
30-
import org.apache.rocketmq.redis.replicator.cmd.Command;
3128
import org.apache.rocketmq.redis.replicator.conf.Configure;
32-
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
29+
import org.apache.rocketmq.redis.replicator.event.Event;
3330
import org.apache.rocketmq.remoting.exception.RemotingException;
3431

3532
import static com.alibaba.fastjson.JSON.toJSONBytes;
3633
import static com.alibaba.fastjson.serializer.SerializerFeature.IgnoreNonFieldGetter;
37-
import static java.lang.String.valueOf;
38-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL;
39-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL_GLOBAL;
4034
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_DATA_TOPIC;
4135
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_NAMESERVER_ADDRESS;
4236
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_GROUP_NAME;
43-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_INSTANCE_NAME;
4437

4538
public class RocketMQProducer {
4639

4740
private String topic;
48-
private boolean global;
4941
private MQProducer producer;
50-
private List<MessageQueue> messageQueues;
5142

5243
public RocketMQProducer(Configure configure) throws MQClientException {
5344
Objects.requireNonNull(configure);
5445
this.topic = configure.getString(ROCKETMQ_DATA_TOPIC);
55-
this.global = configure.getString(ORDER_MODEL, ORDER_MODEL_GLOBAL, true).equals(ORDER_MODEL_GLOBAL);
5646
DefaultMQProducer producer = new DefaultMQProducer();
5747
producer.setNamesrvAddr(configure.getString(ROCKETMQ_NAMESERVER_ADDRESS));
5848
producer.setProducerGroup(configure.getString(ROCKETMQ_PRODUCER_GROUP_NAME));
59-
producer.setInstanceName(configure.getString(ROCKETMQ_PRODUCER_INSTANCE_NAME));
6049
this.producer = producer;
6150
this.producer.start();
62-
this.messageQueues = this.producer.fetchPublishMessageQueues(this.topic);
6351
}
6452

6553
/**
66-
* Send rdb data which always is key-value pair. System command such as flushall won't be included.
67-
* <p>
68-
*
69-
* @param kv rdb key value pair
54+
* @param event redis event.
7055
* @return true if send success.
7156
* @throws MQClientException MQClientException
7257
* @throws RemotingException RemotingException
7358
* @throws InterruptedException InterruptedException
7459
* @throws MQBrokerException MQBrokerException
7560
*/
76-
public boolean sendKeyValuePair(
77-
KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
78-
return global ? sendGlobalOrder(kv) : sendPartialOrder(kv);
79-
}
80-
81-
/**
82-
* Send realtime redis master command to RMQ. As the command may be a system command like 'flushall',
83-
* which does't have key. So dispatch the command to different queue may cause unexpected issue.
84-
* <p>
85-
*
86-
* @param command aof command
87-
* @return true if send success
88-
* @throws MQClientException MQClientException
89-
* @throws RemotingException RemotingException
90-
* @throws InterruptedException InterruptedException
91-
* @throws MQBrokerException MQBrokerException
92-
*/
93-
public boolean sendCommand(
94-
Command command) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
95-
Message msg = new Message(this.topic, toJSONBytes(command, IgnoreNonFieldGetter));
96-
SendResult rs = this.producer.send(msg, this.messageQueues.get(0));
97-
return rs.getSendStatus() == SendStatus.SEND_OK;
98-
}
99-
100-
/**
101-
* Send rdb data to a single queue to keep global order.
102-
* <p>
103-
*
104-
* @param kv rdb key value pair
105-
* @return true if send success
106-
* @throws MQClientException MQClientException
107-
* @throws RemotingException RemotingException
108-
* @throws InterruptedException InterruptedException
109-
* @throws MQBrokerException MQBrokerException
110-
*/
111-
private boolean sendGlobalOrder(
112-
KeyValuePair<?> kv) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
113-
Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv, IgnoreNonFieldGetter));
114-
115-
SendResult rs = this.producer.send(msg, this.messageQueues.get(0));
116-
return rs.getSendStatus() == SendStatus.SEND_OK;
117-
}
118-
119-
/**
120-
* Rdb contents are always key-value pair, so we could dispatch identical key to one queue,
121-
* to support consume concurrently and keep partial orderly.
122-
* <p>
123-
*
124-
* @param kv rdb key value pair
125-
* @return true if send success
126-
* @throws MQClientException MQClientException
127-
* @throws RemotingException RemotingException
128-
* @throws InterruptedException InterruptedException
129-
* @throws MQBrokerException MQBrokerException
130-
*/
131-
private boolean sendPartialOrder(
132-
KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
133-
String key = kv.getKey();
134-
int index = key.hashCode() % this.messageQueues.size();
135-
Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv));
136-
SendResult rs = this.producer.send(msg, this.messageQueues.get(index));
61+
public boolean send(
62+
Event event) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
63+
Message msg = new Message(this.topic, toJSONBytes(event, IgnoreNonFieldGetter));
64+
SendResult rs = this.producer.send(msg);
13765
return rs.getSendStatus() == SendStatus.SEND_OK;
13866
}
13967
}

rocketmq-redis/src/main/resources/replicator.conf

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,11 @@ redis.uri = redis://127.0.0.1:6379
3131
rocketmq.nameserver.address = 127.0.0.1:9876
3232

3333
# the group name of rocketmq procuder. Different application should use unique group name
34-
rocketmq.producer.groupname = redis_replicator_producer_group_name
35-
36-
# the instance name of rocketmq procuder. Different application should use unique instance name
37-
rocketmq.producer.instancename = redis_replicator_producer_instance_name
34+
rocketmq.producer.groupname = REDIS_REPLICATOR_PRODUCER_GROUP
3835

3936
# the topic which redis replication data is sent to
4037
rocketmq.data.topic = redisdata
4138

42-
# the model of data/commands order.
43-
# value includes:
44-
# "global" - sending all redis data/commands to a single queue of RMQ to guarantee global order
45-
# "partial" - sending RDB data to multi queue to consume concurrently, but sending commands in a single queue
46-
order.model = global
47-
4839
# set deploy model.
4940
# value includes:
5041
# "single" - only one replicator server, which is suitable for develop or test

rocketmq-redis/src/test/java/org/apache/rocketmq/redis/replicator/RocketMQRedisReplicatorTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.rocketmq.redis.replicator.cmd.Command;
2121
import org.apache.rocketmq.redis.replicator.cmd.CommandListener;
2222
import org.apache.rocketmq.redis.replicator.conf.Configure;
23-
import org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants;
2423
import org.apache.rocketmq.redis.replicator.producer.RocketMQProducer;
2524
import org.apache.rocketmq.redis.replicator.rdb.RdbListener;
2625
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
@@ -52,8 +51,7 @@ public void setUp() throws URISyntaxException {
5251
URI redisURI = new URI("redis", uri.getRawAuthority(), uri.getRawPath(), uri.getRawQuery(), uri.getRawFragment());
5352
properties.setProperty("redis.uri", redisURI.toString());
5453
properties.setProperty("rocketmq.nameserver.address", nsAddr);
55-
properties.setProperty("rocketmq.producer.groupname", "redis_replicator_producer_group_name");
56-
properties.setProperty("rocketmq.producer.instancename", "redis_replicator_producer_instance_name");
54+
properties.setProperty("rocketmq.producer.groupname", "REDIS_REPLICATOR_PRODUCER_GROUP");
5755
properties.setProperty("rocketmq.data.topic", topic);
5856
}
5957

@@ -67,7 +65,7 @@ public void open() throws Exception {
6765
@Override
6866
public void handle(Replicator replicator, KeyValuePair<?> kv) {
6967
try {
70-
boolean success = producer.sendKeyValuePair(kv);
68+
boolean success = producer.send(kv);
7169
if (!success) {
7270
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
7371
} else {
@@ -83,7 +81,7 @@ public void handle(Replicator replicator, KeyValuePair<?> kv) {
8381
@Override
8482
public void handle(Replicator replicator, Command command) {
8583
try {
86-
boolean success = producer.sendCommand(command);
84+
boolean success = producer.send(command);
8785
if (!success) {
8886
LOGGER.error("Fail to send command[{}]", command);
8987
} else {

0 commit comments

Comments
 (0)