Skip to content

Commit b7ec412

Browse files
committed
OpenMessaging code reformat.
1 parent 6edeb83 commit b7ec412

File tree

11 files changed

+35
-28
lines changed

11 files changed

+35
-28
lines changed

example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ public static void main(String[] args) {
3333
final Producer producer = messagingAccessPoint.createProducer();
3434

3535
messagingAccessPoint.startup();
36-
System.out.println("messagingAccessPoint startup OK");
36+
System.out.printf("MessagingAccessPoint startup OK%n");
3737

3838
producer.startup();
39-
System.out.println("producer startup OK");
39+
System.out.printf("Producer startup OK%n");
4040

4141
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4242
@Override
@@ -50,25 +50,27 @@ public void run() {
5050
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
5151
SendResult sendResult = producer.send(message);
5252
//final Void aVoid = result.get(3000L);
53-
System.out.println("send async message OK, msgId: " + sendResult.messageId());
53+
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
5454
}
5555

5656
{
5757
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
5858
result.addListener(new PromiseListener<SendResult>() {
59-
@Override public void operationCompleted(Promise<SendResult> promise) {
60-
System.out.println("Send async message OK, msgId: " + promise.get().messageId());
59+
@Override
60+
public void operationCompleted(Promise<SendResult> promise) {
61+
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
6162
}
6263

63-
@Override public void operationFailed(Promise<SendResult> promise) {
64-
System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage());
64+
@Override
65+
public void operationFailed(Promise<SendResult> promise) {
66+
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
6567
}
6668
});
6769
}
6870

6971
{
7072
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
71-
System.out.println("Send oneway message OK");
73+
System.out.printf("Send oneway message OK%n");
7274
}
7375
}
7476
}

example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static void main(String[] args) {
3333
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
3434

3535
messagingAccessPoint.startup();
36-
System.out.println("messagingAccessPoint startup OK");
36+
System.out.printf("MessagingAccessPoint startup OK%n");
3737

3838
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
3939
@Override
@@ -44,13 +44,13 @@ public void run() {
4444
}));
4545

4646
consumer.startup();
47-
System.out.println("consumer startup OK");
47+
System.out.printf("Consumer startup OK%n");
4848

4949
while (true) {
5050
Message message = consumer.poll();
5151
if (message != null) {
5252
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
53-
System.out.println("Received one message: " + msgId);
53+
System.out.printf("Received one message: %s%n", msgId);
5454
consumer.ack(msgId);
5555
}
5656
}

example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static void main(String[] args) {
3535
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
3636

3737
messagingAccessPoint.startup();
38-
System.out.println("messagingAccessPoint startup OK");
38+
System.out.printf("MessagingAccessPoint startup OK%n");
3939

4040
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4141
@Override
@@ -48,12 +48,12 @@ public void run() {
4848
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
4949
@Override
5050
public void onMessage(final Message message, final ReceivedMessageContext context) {
51-
System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
51+
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
5252
context.ack();
5353
}
5454
});
5555

5656
consumer.startup();
57-
System.out.println("consumer startup OK");
57+
System.out.printf("Consumer startup OK%n");
5858
}
5959
}

openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ private void cleanExpireMsg() {
175175
try {
176176
if (!msgTreeMap.isEmpty()) {
177177
msg = msgTreeMap.firstEntry().getValue();
178-
System.out.println(msg);
179178
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
180179
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
181180
//Expired, ack and remove it.

openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import io.openmessaging.ReceivedMessageContext;
2626
import io.openmessaging.exception.OMSRuntimeException;
2727
import io.openmessaging.rocketmq.config.ClientConfig;
28+
import io.openmessaging.rocketmq.domain.NonStandardKeys;
2829
import io.openmessaging.rocketmq.utils.BeanUtils;
2930
import io.openmessaging.rocketmq.utils.OMSUtil;
30-
import io.openmessaging.rocketmq.domain.NonStandardKeys;
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +47,6 @@ public class PushConsumerImpl implements PushConsumer {
4747
private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
4848
private final ClientConfig clientConfig;
4949

50-
5150
public PushConsumerImpl(final KeyValue properties) {
5251
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
5352
this.properties = properties;
@@ -130,7 +129,8 @@ public synchronized void shutdown() {
130129
class MessageListenerImpl implements MessageListenerConcurrently {
131130

132131
@Override
133-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList, ConsumeConcurrentlyContext contextRMQ) {
132+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
133+
ConsumeConcurrentlyContext contextRMQ) {
134134
MessageExt rmqMsg = rmqMsgList.get(0);
135135
BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
136136

openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
4343

44-
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
44+
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
4545
final static Logger log = ClientLogger.getLog();
4646
final KeyValue properties;
4747
final DefaultMQProducer rocketmqProducer;

openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import io.openmessaging.PropertyKeys;
2626
import io.openmessaging.SendResult;
2727
import io.openmessaging.exception.OMSRuntimeException;
28-
import io.openmessaging.rocketmq.utils.OMSUtil;
2928
import io.openmessaging.rocketmq.promise.DefaultPromise;
29+
import io.openmessaging.rocketmq.utils.OMSUtil;
3030
import org.apache.rocketmq.client.producer.SendCallback;
3131
import org.apache.rocketmq.client.producer.SendStatus;
3232

openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public synchronized void commit() {
7878

7979
try {
8080
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
81-
String [] msgIdArray = sendResult.getMsgId().split(",");
81+
String[] msgIdArray = sendResult.getMsgId().split(",");
8282
for (int i = 0; i < messages.size(); i++) {
8383
Message message = messages.get(i);
8484
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);

openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public V get(final long timeout) {
7777
} else {
7878
long waitTime = timeout - (System.currentTimeMillis() - createTime);
7979
if (waitTime > 0) {
80-
for (; ; ) {
80+
for (;; ) {
8181
try {
8282
lock.wait(waitTime);
8383
} catch (InterruptedException e) {

openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818
package io.openmessaging.rocketmq.promise;
1919

2020
public enum FutureState {
21-
/** the task is doing **/
21+
/**
22+
* the task is doing
23+
**/
2224
DOING(0),
23-
/** the task is done **/
25+
/**
26+
* the task is done
27+
**/
2428
DONE(1),
25-
/** ths task is cancelled **/
29+
/**
30+
* ths task is cancelled
31+
**/
2632
CANCELLED(2);
2733

2834
public final int value;

0 commit comments

Comments
 (0)