Skip to content

Commit 6d29b9a

Browse files
leonchen83chenby@nextop.cn
authored andcommitted
1. Add more event to downstream to rocketmq .eg(PreFullSync and PostFullSync event, AuxField event)
2. Remove global and partial config from replicator.conf. 3. Remove rocketmq instance config. 4. Remove unused pom properties.
1 parent 7d3762d commit 6d29b9a

File tree

7 files changed

+80
-113
lines changed

7 files changed

+80
-113
lines changed

rocketmq-redis/README.md

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,52 @@ rocketmq 4.1.0 or higher
4949
Replicator replicator = new RocketMQRedisReplicator(configure);
5050
final RocketMQProducer producer = new RocketMQProducer(configure);
5151

52-
replicator.addRdbListener(new RdbListener.Adaptor() {
52+
replicator.addRdbListener(new RdbListener() {
53+
@Override public void preFullSync(Replicator replicator) {
54+
try {
55+
if (!producer.send(new PreFullSyncEvent())) {
56+
LOGGER.error("Fail to send PreFullSync event");
57+
}
58+
} catch (Exception e) {
59+
LOGGER.error("Fail to send PreFullSync event", e);
60+
}
61+
}
62+
63+
@Override public void auxField(Replicator replicator, AuxField auxField) {
64+
try {
65+
if (!producer.send(auxField)) {
66+
LOGGER.error("Fail to send AuxField[{}]", auxField);
67+
}
68+
} catch (Exception e) {
69+
LOGGER.error(String.format("Fail to send AuxField[%s]", auxField), e);
70+
}
71+
}
72+
5373
@Override public void handle(Replicator replicator, KeyValuePair<?> kv) {
5474
try {
55-
boolean success = producer.sendKeyValuePair(kv);
56-
if (!success) {
75+
if (!producer.send(kv)) {
5776
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
5877
}
5978
} catch (Exception e) {
6079
LOGGER.error(String.format("Fail to send KeyValuePair[key=%s]", kv.getKey()), e);
6180
}
6281
}
82+
83+
@Override public void postFullSync(Replicator replicator, long checksum) {
84+
try {
85+
if (!producer.send(new PostFullSyncEvent(checksum))) {
86+
LOGGER.error("Fail to send send PostFullSync event");
87+
}
88+
} catch (Exception e) {
89+
LOGGER.error("Fail to send PostFullSync event", e);
90+
}
91+
}
6392
});
6493

6594
replicator.addCommandListener(new CommandListener() {
6695
@Override public void handle(Replicator replicator, Command command) {
6796
try {
68-
boolean success = producer.sendCommand(command);
69-
if (!success) {
97+
if (!producer.send(command)) {
7098
LOGGER.error("Fail to send command[{}]", command);
7199
}
72100
} catch (Exception e) {
@@ -90,10 +118,8 @@ The config file located at target/rocketmq-redis-pack/conf/replicator.conf
90118
| parameter | default value| detail |
91119
|-----------|--------------|--------|
92120
| rocketmq.nameserver.address | 127.0.0.1:9876 | rocketmq server address|
93-
| rocketmq.producer.groupname | redis_replicator_producer_group_name | rocketmq group name |
94-
| rocketmq.producer.instancename | redis_replicator_producer_instance_name | rocketmq instance name |
121+
| rocketmq.producer.groupname | REDIS_REPLICATOR_PRODUCER_GROUP | rocketmq group name |
95122
| rocketmq.data.topic | redisdata | rocketmq topic name |
96-
| order.model | global | global or partial |
97123
| deploy.model | single | single or cluster |
98124
| zookeeper.address | 127.0.0.1:2181 | run on cluster model |
99125
| redis.uri | redis://127.0.0.1:6379 | the uri of redis master which replicate from |

rocketmq-redis/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
<maven.compiler.source>1.7</maven.compiler.source>
8080
<maven.compiler.target>1.7</maven.compiler.target>
8181
<!-- zookeeper -->
82-
<zookeeper.version>3.4.6</zookeeper.version>
8382
<curator.version>2.9.1</curator.version>
8483
<logback.version>1.1.3</logback.version>
8584
<rocketmq.client.version>4.1.0-incubating</rocketmq.client.version>

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RocketMQRedisReplicator.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@
2929
import org.apache.rocketmq.redis.replicator.cmd.CommandName;
3030
import org.apache.rocketmq.redis.replicator.cmd.CommandParser;
3131
import org.apache.rocketmq.redis.replicator.conf.Configure;
32+
import org.apache.rocketmq.redis.replicator.event.PostFullSyncEvent;
33+
import org.apache.rocketmq.redis.replicator.event.PreFullSyncEvent;
3234
import org.apache.rocketmq.redis.replicator.io.RawByteListener;
3335
import org.apache.rocketmq.redis.replicator.producer.RocketMQProducer;
3436
import org.apache.rocketmq.redis.replicator.rdb.RdbListener;
3537
import org.apache.rocketmq.redis.replicator.rdb.RdbVisitor;
38+
import org.apache.rocketmq.redis.replicator.rdb.datatype.AuxField;
3639
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
3740
import org.apache.rocketmq.redis.replicator.rdb.datatype.Module;
3841
import org.apache.rocketmq.redis.replicator.rdb.module.ModuleParser;
@@ -208,24 +211,52 @@ public static void main(String[] args) throws Exception {
208211
Replicator replicator = new RocketMQRedisReplicator(configure);
209212
final RocketMQProducer producer = new RocketMQProducer(configure);
210213

211-
replicator.addRdbListener(new RdbListener.Adaptor() {
214+
replicator.addRdbListener(new RdbListener() {
215+
@Override public void preFullSync(Replicator replicator) {
216+
try {
217+
if (!producer.send(new PreFullSyncEvent())) {
218+
LOGGER.error("Fail to send PreFullSync event");
219+
}
220+
} catch (Exception e) {
221+
LOGGER.error("Fail to send PreFullSync event", e);
222+
}
223+
}
224+
225+
@Override public void auxField(Replicator replicator, AuxField auxField) {
226+
try {
227+
if (!producer.send(auxField)) {
228+
LOGGER.error("Fail to send AuxField[{}]", auxField);
229+
}
230+
} catch (Exception e) {
231+
LOGGER.error(String.format("Fail to send AuxField[%s]", auxField), e);
232+
}
233+
}
234+
212235
@Override public void handle(Replicator replicator, KeyValuePair<?> kv) {
213236
try {
214-
boolean success = producer.sendKeyValuePair(kv);
215-
if (!success) {
237+
if (!producer.send(kv)) {
216238
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
217239
}
218240
} catch (Exception e) {
219241
LOGGER.error(String.format("Fail to send KeyValuePair[key=%s]", kv.getKey()), e);
220242
}
221243
}
244+
245+
@Override public void postFullSync(Replicator replicator, long checksum) {
246+
try {
247+
if (!producer.send(new PostFullSyncEvent(checksum))) {
248+
LOGGER.error("Fail to send send PostFullSync event");
249+
}
250+
} catch (Exception e) {
251+
LOGGER.error("Fail to send PostFullSync event", e);
252+
}
253+
}
222254
});
223255

224256
replicator.addCommandListener(new CommandListener() {
225257
@Override public void handle(Replicator replicator, Command command) {
226258
try {
227-
boolean success = producer.sendCommand(command);
228-
if (!success) {
259+
if (!producer.send(command)) {
229260
LOGGER.error("Fail to send command[{}]", command);
230261
}
231262
} catch (Exception e) {

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/conf/ReplicatorConstants.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,8 @@ public class ReplicatorConstants {
3333

3434
public static final String ROCKETMQ_PRODUCER_GROUP_NAME = "rocketmq.producer.groupname";
3535

36-
public static final String ROCKETMQ_PRODUCER_INSTANCE_NAME = "rocketmq.producer.instancename";
37-
3836
public static final String ROCKETMQ_DATA_TOPIC = "rocketmq.data.topic";
3937

40-
public static final String ORDER_MODEL = "order.model";
41-
42-
public static final String ORDER_MODEL_GLOBAL = "global";
43-
4438
public static final String DEPLOY_MODEL = "deploy.model";
4539

4640
public static final String DEPLOY_MODEL_CLUSTER = "cluster";

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/producer/RocketMQProducer.java

Lines changed: 6 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.rocketmq.redis.replicator.producer;
1919

20-
import java.util.List;
2120
import java.util.Objects;
2221
import org.apache.rocketmq.client.exception.MQBrokerException;
2322
import org.apache.rocketmq.client.exception.MQClientException;
@@ -26,114 +25,43 @@
2625
import org.apache.rocketmq.client.producer.SendResult;
2726
import org.apache.rocketmq.client.producer.SendStatus;
2827
import org.apache.rocketmq.common.message.Message;
29-
import org.apache.rocketmq.common.message.MessageQueue;
30-
import org.apache.rocketmq.redis.replicator.cmd.Command;
3128
import org.apache.rocketmq.redis.replicator.conf.Configure;
32-
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
29+
import org.apache.rocketmq.redis.replicator.event.Event;
3330
import org.apache.rocketmq.remoting.exception.RemotingException;
3431

3532
import static com.alibaba.fastjson.JSON.toJSONBytes;
3633
import static com.alibaba.fastjson.serializer.SerializerFeature.IgnoreNonFieldGetter;
37-
import static java.lang.String.valueOf;
38-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL;
39-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL_GLOBAL;
4034
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_DATA_TOPIC;
4135
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_NAMESERVER_ADDRESS;
4236
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_GROUP_NAME;
43-
import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_INSTANCE_NAME;
4437

4538
public class RocketMQProducer {
4639

4740
private String topic;
48-
private boolean global;
4941
private MQProducer producer;
50-
private List<MessageQueue> messageQueues;
5142

5243
public RocketMQProducer(Configure configure) throws MQClientException {
5344
Objects.requireNonNull(configure);
5445
this.topic = configure.getString(ROCKETMQ_DATA_TOPIC);
55-
this.global = configure.getString(ORDER_MODEL, ORDER_MODEL_GLOBAL, true).equals(ORDER_MODEL_GLOBAL);
5646
DefaultMQProducer producer = new DefaultMQProducer();
5747
producer.setNamesrvAddr(configure.getString(ROCKETMQ_NAMESERVER_ADDRESS));
5848
producer.setProducerGroup(configure.getString(ROCKETMQ_PRODUCER_GROUP_NAME));
59-
producer.setInstanceName(configure.getString(ROCKETMQ_PRODUCER_INSTANCE_NAME));
6049
this.producer = producer;
6150
this.producer.start();
62-
this.messageQueues = this.producer.fetchPublishMessageQueues(this.topic);
6351
}
6452

6553
/**
66-
* Send rdb data which always is key-value pair. System command such as flushall won't be included.
67-
* <p>
68-
*
69-
* @param kv rdb key value pair
54+
* @param event redis event.
7055
* @return true if send success.
7156
* @throws MQClientException MQClientException
7257
* @throws RemotingException RemotingException
7358
* @throws InterruptedException InterruptedException
7459
* @throws MQBrokerException MQBrokerException
7560
*/
76-
public boolean sendKeyValuePair(
77-
KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
78-
return global ? sendGlobalOrder(kv) : sendPartialOrder(kv);
79-
}
80-
81-
/**
82-
* Send realtime redis master command to RMQ. As the command may be a system command like 'flushall',
83-
* which does't have key. So dispatch the command to different queue may cause unexpected issue.
84-
* <p>
85-
*
86-
* @param command aof command
87-
* @return true if send success
88-
* @throws MQClientException MQClientException
89-
* @throws RemotingException RemotingException
90-
* @throws InterruptedException InterruptedException
91-
* @throws MQBrokerException MQBrokerException
92-
*/
93-
public boolean sendCommand(
94-
Command command) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
95-
Message msg = new Message(this.topic, toJSONBytes(command, IgnoreNonFieldGetter));
96-
SendResult rs = this.producer.send(msg, this.messageQueues.get(0));
97-
return rs.getSendStatus() == SendStatus.SEND_OK;
98-
}
99-
100-
/**
101-
* Send rdb data to a single queue to keep global order.
102-
* <p>
103-
*
104-
* @param kv rdb key value pair
105-
* @return true if send success
106-
* @throws MQClientException MQClientException
107-
* @throws RemotingException RemotingException
108-
* @throws InterruptedException InterruptedException
109-
* @throws MQBrokerException MQBrokerException
110-
*/
111-
private boolean sendGlobalOrder(
112-
KeyValuePair<?> kv) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
113-
Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv, IgnoreNonFieldGetter));
114-
115-
SendResult rs = this.producer.send(msg, this.messageQueues.get(0));
116-
return rs.getSendStatus() == SendStatus.SEND_OK;
117-
}
118-
119-
/**
120-
* Rdb contents are always key-value pair, so we could dispatch identical key to one queue,
121-
* to support consume concurrently and keep partial orderly.
122-
* <p>
123-
*
124-
* @param kv rdb key value pair
125-
* @return true if send success
126-
* @throws MQClientException MQClientException
127-
* @throws RemotingException RemotingException
128-
* @throws InterruptedException InterruptedException
129-
* @throws MQBrokerException MQBrokerException
130-
*/
131-
private boolean sendPartialOrder(
132-
KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
133-
String key = kv.getKey();
134-
int index = key.hashCode() % this.messageQueues.size();
135-
Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv));
136-
SendResult rs = this.producer.send(msg, this.messageQueues.get(index));
61+
public boolean send(
62+
Event event) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
63+
Message msg = new Message(this.topic, toJSONBytes(event, IgnoreNonFieldGetter));
64+
SendResult rs = this.producer.send(msg);
13765
return rs.getSendStatus() == SendStatus.SEND_OK;
13866
}
13967
}

rocketmq-redis/src/main/resources/replicator.conf

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,11 @@ redis.uri = redis://127.0.0.1:6379
3131
rocketmq.nameserver.address = 127.0.0.1:9876
3232

3333
# the group name of rocketmq procuder. Different application should use unique group name
34-
rocketmq.producer.groupname = redis_replicator_producer_group_name
35-
36-
# the instance name of rocketmq procuder. Different application should use unique instance name
37-
rocketmq.producer.instancename = redis_replicator_producer_instance_name
34+
rocketmq.producer.groupname = REDIS_REPLICATOR_PRODUCER_GROUP
3835

3936
# the topic which redis replication data is sent to
4037
rocketmq.data.topic = redisdata
4138

42-
# the model of data/commands order.
43-
# value includes:
44-
# "global" - sending all redis data/commands to a single queue of RMQ to guarantee global order
45-
# "partial" - sending RDB data to multi queue to consume concurrently, but sending commands in a single queue
46-
order.model = global
47-
4839
# set deploy model.
4940
# value includes:
5041
# "single" - only one replicator server, which is suitable for develop or test

rocketmq-redis/src/test/java/org/apache/rocketmq/redis/replicator/RocketMQRedisReplicatorTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.rocketmq.redis.replicator.cmd.Command;
2121
import org.apache.rocketmq.redis.replicator.cmd.CommandListener;
2222
import org.apache.rocketmq.redis.replicator.conf.Configure;
23-
import org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants;
2423
import org.apache.rocketmq.redis.replicator.producer.RocketMQProducer;
2524
import org.apache.rocketmq.redis.replicator.rdb.RdbListener;
2625
import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair;
@@ -52,8 +51,7 @@ public void setUp() throws URISyntaxException {
5251
URI redisURI = new URI("redis", uri.getRawAuthority(), uri.getRawPath(), uri.getRawQuery(), uri.getRawFragment());
5352
properties.setProperty("redis.uri", redisURI.toString());
5453
properties.setProperty("rocketmq.nameserver.address", nsAddr);
55-
properties.setProperty("rocketmq.producer.groupname", "redis_replicator_producer_group_name");
56-
properties.setProperty("rocketmq.producer.instancename", "redis_replicator_producer_instance_name");
54+
properties.setProperty("rocketmq.producer.groupname", "REDIS_REPLICATOR_PRODUCER_GROUP");
5755
properties.setProperty("rocketmq.data.topic", topic);
5856
}
5957

@@ -67,7 +65,7 @@ public void open() throws Exception {
6765
@Override
6866
public void handle(Replicator replicator, KeyValuePair<?> kv) {
6967
try {
70-
boolean success = producer.sendKeyValuePair(kv);
68+
boolean success = producer.send(kv);
7169
if (!success) {
7270
LOGGER.error("Fail to send KeyValuePair[key={}]", kv.getKey());
7371
} else {
@@ -83,7 +81,7 @@ public void handle(Replicator replicator, KeyValuePair<?> kv) {
8381
@Override
8482
public void handle(Replicator replicator, Command command) {
8583
try {
86-
boolean success = producer.sendCommand(command);
84+
boolean success = producer.send(command);
8785
if (!success) {
8886
LOGGER.error("Fail to send command[{}]", command);
8987
} else {

0 commit comments

Comments
 (0)