Skip to content

Commit c508cb3

Browse files
author
shutian.lzh
committed
Accomodate updated openmessaging api
1 parent 173f77d commit c508cb3

File tree

5 files changed

+11
-6
lines changed

5 files changed

+11
-6
lines changed

openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.openmessaging.KeyValue;
2121
import io.openmessaging.Message;
2222
import io.openmessaging.OMS;
23+
import io.openmessaging.exception.OMSMessageFormatException;
2324
import org.apache.commons.lang3.builder.ToStringBuilder;
2425

2526
public class BytesMessageImpl implements BytesMessage {
@@ -33,8 +34,12 @@ public BytesMessageImpl() {
3334
}
3435

3536
@Override
36-
public byte[] getBody() {
37-
return body;
37+
public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
38+
if (type == byte[].class) {
39+
return (T)body;
40+
}
41+
42+
throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
3843
}
3944

4045
@Override

openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static String buildInstanceName() {
4646

4747
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
4848
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
49-
rmqMessage.setBody(omsMessage.getBody());
49+
rmqMessage.setBody(omsMessage.getBody(byte[].class));
5050

5151
KeyValue sysHeaders = omsMessage.sysHeaders();
5252
KeyValue userHeaders = omsMessage.userHeaders();

openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testPoll() {
8383

8484
Message message = consumer.receive();
8585
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
86-
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
86+
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
8787
}
8888

8989
@Test

openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void testConsumeMessage() {
7575
@Override
7676
public void onReceived(Message message, Context context) {
7777
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
78-
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
78+
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
7979
context.ack();
8080
}
8181
});

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@
592592
<dependency>
593593
<groupId>io.openmessaging</groupId>
594594
<artifactId>openmessaging-api</artifactId>
595-
<version>0.3.0-alpha</version>
595+
<version>0.3.1-alpha-SNAPSHOT</version>
596596
</dependency>
597597
<dependency>
598598
<groupId>log4j</groupId>

0 commit comments

Comments
 (0)