Skip to content

Commit 625ba07

Browse files
committed
Add PullConsumer related implementation for OpenMessaging.
1 parent a5ea4e4 commit 625ba07

File tree

9 files changed

+413
-14
lines changed

9 files changed

+413
-14
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.example.openmessaging;
18+
19+
import io.openmessaging.Message;
20+
import io.openmessaging.MessageHeader;
21+
import io.openmessaging.MessagingAccessPoint;
22+
import io.openmessaging.MessagingAccessPointFactory;
23+
import io.openmessaging.OMS;
24+
import io.openmessaging.PullConsumer;
25+
import io.openmessaging.rocketmq.domain.NonStandardKeys;
26+
27+
public class SimplePullConsumer {
28+
public static void main(String[] args) {
29+
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
30+
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
31+
32+
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
33+
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
34+
35+
messagingAccessPoint.startup();
36+
System.out.println("messagingAccessPoint startup OK");
37+
38+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
39+
@Override
40+
public void run() {
41+
consumer.shutdown();
42+
messagingAccessPoint.shutdown();
43+
}
44+
}));
45+
46+
consumer.startup();
47+
System.out.println("consumer startup OK");
48+
49+
while (true) {
50+
Message message = consumer.poll();
51+
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
52+
System.out.println("Received one message: " + msgId);
53+
consumer.ack(msgId);
54+
}
55+
}
56+
}

example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.rocketmq.example.openmessaging;
1818

1919
import io.openmessaging.Message;
20+
import io.openmessaging.MessageHeader;
2021
import io.openmessaging.MessageListener;
2122
import io.openmessaging.MessagingAccessPoint;
2223
import io.openmessaging.MessagingAccessPointFactory;
@@ -47,7 +48,7 @@ public void run() {
4748
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
4849
@Override
4950
public void onMessage(final Message message, final ReceivedMessageContext context) {
50-
System.out.println("Received one message: " + message);
51+
System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
5152
context.ack();
5253
}
5354
});

openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ public PushConsumer createPushConsumer(KeyValue properties) {
7676

7777
@Override
7878
public PullConsumer createPullConsumer(String queueName) {
79-
return new PullConsumerImpl(accessPointProperties);
79+
return new PullConsumerImpl(queueName, accessPointProperties);
8080
}
8181

8282
@Override
8383
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
84-
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
84+
return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
8585
}
8686

8787
@Override

openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import io.openmessaging.rocketmq.domain.NonStandardKeys;
2626
import io.openmessaging.rocketmq.domain.SendResultImpl;
2727
import java.lang.reflect.Field;
28+
import java.util.Iterator;
2829
import java.util.Map;
30+
import java.util.NoSuchElementException;
2931
import java.util.Set;
3032
import org.apache.rocketmq.client.producer.SendStatus;
3133
import org.apache.rocketmq.common.UtilAll;
@@ -88,7 +90,8 @@ public static BytesMessage msgConvert(org.apache.rocketmq.common.message.Message
8890
}
8991

9092
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
91-
if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
93+
if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
94+
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
9295
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
9396
} else {
9497
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
@@ -131,4 +134,49 @@ public static KeyValue buildKeyValue(KeyValue... keyValues) {
131134
}
132135
return keyValue;
133136
}
137+
138+
/**
139+
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
140+
*/
141+
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
142+
return new Iterator<T>() {
143+
Iterator<T> iterator = new Iterator<T>() {
144+
@Override
145+
public synchronized boolean hasNext() {
146+
return false;
147+
}
148+
149+
@Override
150+
public synchronized T next() {
151+
throw new NoSuchElementException();
152+
}
153+
154+
@Override
155+
public synchronized void remove() {
156+
//Ignore
157+
}
158+
};
159+
160+
@Override
161+
public synchronized boolean hasNext() {
162+
return iterator.hasNext() || iterable.iterator().hasNext();
163+
}
164+
165+
@Override
166+
public synchronized T next() {
167+
if (!iterator.hasNext()) {
168+
iterator = iterable.iterator();
169+
if (!iterator.hasNext()) {
170+
throw new NoSuchElementException();
171+
}
172+
}
173+
return iterator.next();
174+
}
175+
176+
@Override
177+
public synchronized void remove() {
178+
iterator.remove();
179+
}
180+
};
181+
}
134182
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.rocketmq.consumer;
18+
19+
import io.openmessaging.KeyValue;
20+
import io.openmessaging.PropertyKeys;
21+
import io.openmessaging.rocketmq.domain.ConsumeRequest;
22+
import io.openmessaging.rocketmq.domain.NonStandardKeys;
23+
import java.util.Collections;
24+
import java.util.Map;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.TimeUnit;
29+
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
30+
import org.apache.rocketmq.client.exception.MQClientException;
31+
import org.apache.rocketmq.client.log.ClientLogger;
32+
import org.apache.rocketmq.common.message.MessageExt;
33+
import org.apache.rocketmq.common.message.MessageQueue;
34+
import org.slf4j.Logger;
35+
36+
class LocalMessageCache {
37+
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
38+
private final Map<String, ConsumeRequest> consumedRequest;
39+
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
40+
private final DefaultMQPullConsumer rocketmqPullConsumer;
41+
private int pullBatchNums = 32;
42+
private int pollTimeout = -1;
43+
private final static Logger log = ClientLogger.getLog();
44+
45+
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) {
46+
int cacheCapacity = 1000;
47+
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
48+
cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
49+
}
50+
consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
51+
52+
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
53+
pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
54+
}
55+
56+
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
57+
pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
58+
}
59+
60+
this.consumedRequest = new ConcurrentHashMap<>();
61+
this.pullOffsetTable = new ConcurrentHashMap<>();
62+
this.rocketmqPullConsumer = rocketmqPullConsumer;
63+
}
64+
65+
int nextPullBatchNums() {
66+
return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity());
67+
}
68+
69+
long nextPullOffset(MessageQueue remoteQueue) {
70+
if (!pullOffsetTable.containsKey(remoteQueue)) {
71+
try {
72+
pullOffsetTable.putIfAbsent(remoteQueue,
73+
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
74+
} catch (MQClientException e) {
75+
log.error("A error occurred in fetch consume offset process.", e);
76+
}
77+
}
78+
return pullOffsetTable.get(remoteQueue);
79+
}
80+
81+
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
82+
pullOffsetTable.put(remoteQueue, nextPullOffset);
83+
}
84+
85+
void submitConsumeRequest(ConsumeRequest consumeRequest) {
86+
try {
87+
consumeRequestCache.put(consumeRequest);
88+
} catch (InterruptedException ignore) {
89+
}
90+
}
91+
92+
MessageExt poll() {
93+
try {
94+
ConsumeRequest consumeRequest = consumeRequestCache.take();
95+
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
96+
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
97+
return consumeRequest.getMessageExt();
98+
} catch (InterruptedException ignore) {
99+
}
100+
return null;
101+
}
102+
103+
MessageExt poll(final KeyValue properties) {
104+
int currentPollTimeout = pollTimeout;
105+
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
106+
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
107+
}
108+
109+
if (currentPollTimeout == -1) {
110+
return poll();
111+
}
112+
113+
try {
114+
ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS);
115+
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
116+
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
117+
return consumeRequest.getMessageExt();
118+
} catch (InterruptedException ignore) {
119+
}
120+
return null;
121+
}
122+
123+
void ack(final String messageId) {
124+
ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
125+
if (consumeRequest != null) {
126+
long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
127+
try {
128+
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
129+
} catch (MQClientException e) {
130+
log.error("A error occurred in update consume offset process.", e);
131+
}
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)