|
17 | 17 |
|
18 | 18 | package org.apache.rocketmq.redis.replicator.producer;
|
19 | 19 |
|
20 |
| -import java.util.List; |
21 | 20 | import java.util.Objects;
|
22 | 21 | import org.apache.rocketmq.client.exception.MQBrokerException;
|
23 | 22 | import org.apache.rocketmq.client.exception.MQClientException;
|
|
26 | 25 | import org.apache.rocketmq.client.producer.SendResult;
|
27 | 26 | import org.apache.rocketmq.client.producer.SendStatus;
|
28 | 27 | import org.apache.rocketmq.common.message.Message;
|
29 |
| -import org.apache.rocketmq.common.message.MessageQueue; |
30 |
| -import org.apache.rocketmq.redis.replicator.cmd.Command; |
31 | 28 | import org.apache.rocketmq.redis.replicator.conf.Configure;
|
32 |
| -import org.apache.rocketmq.redis.replicator.rdb.datatype.KeyValuePair; |
| 29 | +import org.apache.rocketmq.redis.replicator.event.Event; |
33 | 30 | import org.apache.rocketmq.remoting.exception.RemotingException;
|
34 | 31 |
|
35 | 32 | import static com.alibaba.fastjson.JSON.toJSONBytes;
|
36 | 33 | import static com.alibaba.fastjson.serializer.SerializerFeature.IgnoreNonFieldGetter;
|
37 |
| -import static java.lang.String.valueOf; |
38 |
| -import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL; |
39 |
| -import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ORDER_MODEL_GLOBAL; |
40 | 34 | import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_DATA_TOPIC;
|
41 | 35 | import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_NAMESERVER_ADDRESS;
|
42 | 36 | import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_GROUP_NAME;
|
43 |
| -import static org.apache.rocketmq.redis.replicator.conf.ReplicatorConstants.ROCKETMQ_PRODUCER_INSTANCE_NAME; |
44 | 37 |
|
45 | 38 | public class RocketMQProducer {
|
46 | 39 |
|
47 | 40 | private String topic;
|
48 |
| - private boolean global; |
49 | 41 | private MQProducer producer;
|
50 |
| - private List<MessageQueue> messageQueues; |
51 | 42 |
|
52 | 43 | public RocketMQProducer(Configure configure) throws MQClientException {
|
53 | 44 | Objects.requireNonNull(configure);
|
54 | 45 | this.topic = configure.getString(ROCKETMQ_DATA_TOPIC);
|
55 |
| - this.global = configure.getString(ORDER_MODEL, ORDER_MODEL_GLOBAL, true).equals(ORDER_MODEL_GLOBAL); |
56 | 46 | DefaultMQProducer producer = new DefaultMQProducer();
|
57 | 47 | producer.setNamesrvAddr(configure.getString(ROCKETMQ_NAMESERVER_ADDRESS));
|
58 | 48 | producer.setProducerGroup(configure.getString(ROCKETMQ_PRODUCER_GROUP_NAME));
|
59 |
| - producer.setInstanceName(configure.getString(ROCKETMQ_PRODUCER_INSTANCE_NAME)); |
60 | 49 | this.producer = producer;
|
61 | 50 | this.producer.start();
|
62 |
| - this.messageQueues = this.producer.fetchPublishMessageQueues(this.topic); |
63 | 51 | }
|
64 | 52 |
|
65 | 53 | /**
|
66 |
| - * Send rdb data which always is key-value pair. System command such as flushall won't be included. |
67 |
| - * <p> |
68 |
| - * |
69 |
| - * @param kv rdb key value pair |
| 54 | + * @param event redis event. |
70 | 55 | * @return true if send success.
|
71 | 56 | * @throws MQClientException MQClientException
|
72 | 57 | * @throws RemotingException RemotingException
|
73 | 58 | * @throws InterruptedException InterruptedException
|
74 | 59 | * @throws MQBrokerException MQBrokerException
|
75 | 60 | */
|
76 |
| - public boolean sendKeyValuePair( |
77 |
| - KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { |
78 |
| - return global ? sendGlobalOrder(kv) : sendPartialOrder(kv); |
79 |
| - } |
80 |
| - |
81 |
| - /** |
82 |
| - * Send realtime redis master command to RMQ. As the command may be a system command like 'flushall', |
83 |
| - * which does't have key. So dispatch the command to different queue may cause unexpected issue. |
84 |
| - * <p> |
85 |
| - * |
86 |
| - * @param command aof command |
87 |
| - * @return true if send success |
88 |
| - * @throws MQClientException MQClientException |
89 |
| - * @throws RemotingException RemotingException |
90 |
| - * @throws InterruptedException InterruptedException |
91 |
| - * @throws MQBrokerException MQBrokerException |
92 |
| - */ |
93 |
| - public boolean sendCommand( |
94 |
| - Command command) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { |
95 |
| - Message msg = new Message(this.topic, toJSONBytes(command, IgnoreNonFieldGetter)); |
96 |
| - SendResult rs = this.producer.send(msg, this.messageQueues.get(0)); |
97 |
| - return rs.getSendStatus() == SendStatus.SEND_OK; |
98 |
| - } |
99 |
| - |
100 |
| - /** |
101 |
| - * Send rdb data to a single queue to keep global order. |
102 |
| - * <p> |
103 |
| - * |
104 |
| - * @param kv rdb key value pair |
105 |
| - * @return true if send success |
106 |
| - * @throws MQClientException MQClientException |
107 |
| - * @throws RemotingException RemotingException |
108 |
| - * @throws InterruptedException InterruptedException |
109 |
| - * @throws MQBrokerException MQBrokerException |
110 |
| - */ |
111 |
| - private boolean sendGlobalOrder( |
112 |
| - KeyValuePair<?> kv) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { |
113 |
| - Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv, IgnoreNonFieldGetter)); |
114 |
| - |
115 |
| - SendResult rs = this.producer.send(msg, this.messageQueues.get(0)); |
116 |
| - return rs.getSendStatus() == SendStatus.SEND_OK; |
117 |
| - } |
118 |
| - |
119 |
| - /** |
120 |
| - * Rdb contents are always key-value pair, so we could dispatch identical key to one queue, |
121 |
| - * to support consume concurrently and keep partial orderly. |
122 |
| - * <p> |
123 |
| - * |
124 |
| - * @param kv rdb key value pair |
125 |
| - * @return true if send success |
126 |
| - * @throws MQClientException MQClientException |
127 |
| - * @throws RemotingException RemotingException |
128 |
| - * @throws InterruptedException InterruptedException |
129 |
| - * @throws MQBrokerException MQBrokerException |
130 |
| - */ |
131 |
| - private boolean sendPartialOrder( |
132 |
| - KeyValuePair<?> kv) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { |
133 |
| - String key = kv.getKey(); |
134 |
| - int index = key.hashCode() % this.messageQueues.size(); |
135 |
| - Message msg = new Message(this.topic, valueOf(kv.getDb().getDbNumber()), toJSONBytes(kv)); |
136 |
| - SendResult rs = this.producer.send(msg, this.messageQueues.get(index)); |
| 61 | + public boolean send( |
| 62 | + Event event) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { |
| 63 | + Message msg = new Message(this.topic, toJSONBytes(event, IgnoreNonFieldGetter)); |
| 64 | + SendResult rs = this.producer.send(msg); |
137 | 65 | return rs.getSendStatus() == SendStatus.SEND_OK;
|
138 | 66 | }
|
139 | 67 | }
|
0 commit comments