Skip to content

Commit d8a61d1

Browse files
author
yu.jl
committed
RabbitMQ 学习
1 parent ef0345b commit d8a61d1

File tree

5 files changed

+75
-1
lines changed

5 files changed

+75
-1
lines changed

dependencies.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ ext.libraries = [
1616
"spring-boot-starter-aop" : "org.springframework.boot:spring-boot-starter-aop:${versions.springBoot}",
1717
"spring-boot-starter-log4j2" : "org.springframework.boot:spring-boot-starter-log4j2:${versions.springBoot}",
1818
"spring-boot-starter-test" : "org.springframework.boot:spring-boot-starter-test:${versions.springBoot}",
19+
"spring-boot-starter-amqp" : "org.springframework.boot:spring-boot-starter-amqp:${versions.springBoot}",
1920
"spring-boot-starter-tomcat" : "org.springframework.boot:spring-boot-starter-tomcat:${versions.springBoot}",
2021
"spring-boot-starter-jdbc" : "org.springframework.boot:spring-boot-starter-jdbc:${versions.springBoot}",
2122
"spring-boot-starter-data-rest" : "org.springframework.boot:spring-boot-starter-data-rest:${versions.springBoot}",

settings.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ include 'StudyAdminZuul'
1414
include 'studyadmin'
1515
include 'studyapi'
1616
include 'common'
17-
include 'studyconfig2'
17+
include 'studystreamrmq'
18+
include 'studystreamrmqcustomer'
1819

studystreamrmq/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
version '1.0-SNAPSHOT'
3+
4+
5+
dependencies {
6+
implementation libraries['spring-boot-starter-amqp']
7+
implementation libraries['fastjson']
8+
testCompile group: 'junit', name: 'junit', version: '4.12'
9+
}

studystreamrmq/src/main/java/com/ityu/studystreamrmq/rabbitmq/RabbitMqConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.ityu.studystreamrmq.rabbitmq;
22

33

4+
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
45
import org.springframework.amqp.core.*;
56
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
67
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
78
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
9+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
810
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
911
import org.springframework.context.annotation.Bean;
1012
import org.springframework.context.annotation.Configuration;
@@ -89,6 +91,8 @@ Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
8991
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
9092
}
9193

94+
95+
//https://www.jianshu.com/p/2c5eebfd0e95
9296
// @Bean
9397
// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
9498
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
@@ -98,5 +102,23 @@ Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
98102
// return factory;
99103
// }
100104

105+
// @Bean
106+
// public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
107+
// SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
108+
// container.setConnectionFactory(connectionFactory);
109+
// container.setQueueNames("consumer_queue"); // 监听的队列
110+
// container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息
111+
// container.setMessageListener((MessageListener) (message) -> {
112+
// System.out.println("====接收到消息=====");
113+
// System.out.println(new String(message.getBody()));
114+
// //抛出NullPointerException异常则重新入队列
115+
// //throw new NullPointerException("消息消费失败");
116+
// //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
117+
// //throw new AmqpRejectAndDontRequeueException("消息消费失败");
118+
// //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
119+
// throw new ImmediateAcknowledgeAmqpException("消息消费失败");
120+
// });
121+
// return container;
122+
// }
101123

102124
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.ityu.studystreamrmq;
2+
3+
4+
import com.ityu.studystreamrmq.rabbitmq.QueueEnum;
5+
import com.ityu.studystreamrmq.rabbitmq.TestSendMessage;
6+
import org.junit.Test;
7+
import org.junit.runner.RunWith;
8+
9+
import org.springframework.amqp.AmqpException;
10+
import org.springframework.amqp.core.Message;
11+
import org.springframework.amqp.core.MessagePostProcessor;
12+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
13+
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.springframework.test.context.junit4.SpringRunner;
16+
17+
@RunWith(SpringRunner.class)
18+
@SpringBootTest
19+
public class MyTest {
20+
21+
@Autowired
22+
private TestSendMessage amqpTemplate;
23+
@Autowired
24+
private RabbitTemplate amqpTemplate2;
25+
26+
@Test
27+
public void sendMsg() {
28+
amqpTemplate.sendMsg(QueueEnum.QUEUE_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_CANCEL.getName(), "你好啊不错哦");
29+
}
30+
31+
@Test
32+
public void sendMsg2() {
33+
System.out.println(System.currentTimeMillis());
34+
amqpTemplate2.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), "我是延迟消息你能发现不50000", message -> {
35+
message.getMessageProperties().setExpiration("500000");
36+
return message;
37+
});
38+
}
39+
40+
41+
}

0 commit comments

Comments
 (0)