Skip to content

Commit d0f7cf4

Browse files
authored
Merge pull request eugenp#8035 from CROSP/BAEL-3200
BAEL-3200 Error handling with Spring AMQP
2 parents fdbadf9 + 6436440 commit d0f7cf4

17 files changed

+629
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.springamqp.errorhandling;
2+
3+
import com.baeldung.springamqp.errorhandling.producer.MessageProducer;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.boot.context.event.ApplicationReadyEvent;
8+
import org.springframework.context.event.EventListener;
9+
import org.springframework.scheduling.annotation.EnableScheduling;
10+
11+
@SpringBootApplication
12+
@EnableScheduling
13+
public class ErrorHandlingApp {
14+
15+
@Autowired
16+
MessageProducer messageProducer;
17+
18+
public static void main(String[] args) {
19+
SpringApplication.run(ErrorHandlingApp.class, args);
20+
}
21+
22+
@EventListener(ApplicationReadyEvent.class)
23+
public void doSomethingAfterStartup() {
24+
messageProducer.sendMessage();
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
16+
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "dlx-custom")
21+
public class DLXCustomAmqpConfiguration {
22+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
23+
24+
@Bean
25+
Queue messagesQueue() {
26+
return QueueBuilder.durable(QUEUE_MESSAGES)
27+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
28+
.build();
29+
}
30+
31+
@Bean
32+
FanoutExchange deadLetterExchange() {
33+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
34+
}
35+
36+
@Bean
37+
Queue deadLetterQueue() {
38+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
39+
}
40+
41+
@Bean
42+
Binding deadLetterBinding() {
43+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
44+
}
45+
46+
@Bean
47+
DirectExchange messagesExchange() {
48+
return new DirectExchange(EXCHANGE_MESSAGES);
49+
}
50+
51+
@Bean
52+
Binding bindingMessages() {
53+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
16+
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "parking-lot-dlx")
21+
public class DLXParkingLotAmqpConfiguration {
22+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
23+
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
24+
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
25+
26+
@Bean
27+
FanoutExchange parkingLotExchange() {
28+
return new FanoutExchange(EXCHANGE_PARKING_LOT);
29+
}
30+
31+
@Bean
32+
Queue parkingLotQueue() {
33+
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
34+
}
35+
36+
@Bean
37+
Binding parkingLotBinding() {
38+
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
39+
}
40+
41+
@Bean
42+
Queue messagesQueue() {
43+
return QueueBuilder.durable(QUEUE_MESSAGES)
44+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
45+
.build();
46+
}
47+
48+
@Bean
49+
FanoutExchange deadLetterExchange() {
50+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
51+
}
52+
53+
@Bean
54+
Queue deadLetterQueue() {
55+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
56+
}
57+
58+
@Bean
59+
Binding deadLetterBinding() {
60+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
61+
}
62+
63+
@Bean
64+
DirectExchange messagesExchange() {
65+
return new DirectExchange(EXCHANGE_MESSAGES);
66+
}
67+
68+
@Bean
69+
Binding bindingMessages() {
70+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
10+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11+
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
12+
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
13+
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
14+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.util.ErrorHandler;
18+
19+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
20+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
21+
22+
@Configuration
23+
@ConditionalOnProperty(
24+
value = "amqp.configuration.current",
25+
havingValue = "fatal-error-strategy")
26+
public class FatalExceptionStrategyAmqpConfiguration {
27+
28+
@Bean
29+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
30+
ConnectionFactory connectionFactory,
31+
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
32+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
33+
configurer.configure(factory, connectionFactory);
34+
factory.setErrorHandler(errorHandler());
35+
return factory;
36+
}
37+
38+
@Bean
39+
public ErrorHandler errorHandler() {
40+
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
41+
}
42+
43+
@Bean
44+
FatalExceptionStrategy customExceptionStrategy() {
45+
return new CustomFatalExceptionStrategy();
46+
}
47+
48+
@Bean
49+
Queue messagesQueue() {
50+
return QueueBuilder.durable(QUEUE_MESSAGES)
51+
.build();
52+
}
53+
54+
@Bean
55+
DirectExchange messagesExchange() {
56+
return new DirectExchange(EXCHANGE_MESSAGES);
57+
}
58+
59+
@Bean
60+
Binding bindingMessages() {
61+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
10+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11+
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
12+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
13+
import org.springframework.context.annotation.Bean;
14+
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.util.ErrorHandler;
16+
17+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
18+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
19+
20+
@Configuration
21+
@ConditionalOnProperty(
22+
value = "amqp.configuration.current",
23+
havingValue = "listener-error")
24+
public class ListenerErrorHandlerAmqpConfiguration {
25+
26+
@Bean
27+
public ErrorHandler errorHandler() {
28+
return new CustomErrorHandler();
29+
}
30+
31+
@Bean
32+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
33+
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
34+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
35+
configurer.configure(factory, connectionFactory);
36+
factory.setErrorHandler(errorHandler());
37+
return factory;
38+
}
39+
40+
@Bean
41+
Queue messagesQueue() {
42+
return QueueBuilder.durable(QUEUE_MESSAGES)
43+
.build();
44+
}
45+
46+
@Bean
47+
DirectExchange messagesExchange() {
48+
return new DirectExchange(EXCHANGE_MESSAGES);
49+
}
50+
51+
@Bean
52+
Binding bindingMessages() {
53+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
16+
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "routing-dlq")
21+
public class RoutingKeyDLQAmqpConfiguration {
22+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
23+
24+
@Bean
25+
Queue messagesQueue() {
26+
return QueueBuilder.durable(QUEUE_MESSAGES)
27+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
28+
.build();
29+
}
30+
31+
@Bean
32+
FanoutExchange deadLetterExchange() {
33+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
34+
}
35+
36+
@Bean
37+
Queue deadLetterQueue() {
38+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
39+
}
40+
41+
@Bean
42+
Binding deadLetterBinding() {
43+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
44+
}
45+
46+
@Bean
47+
DirectExchange messagesExchange() {
48+
return new DirectExchange(EXCHANGE_MESSAGES);
49+
}
50+
51+
@Bean
52+
Binding bindingMessages() {
53+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.Queue;
7+
import org.springframework.amqp.core.QueueBuilder;
8+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
12+
@Configuration
13+
@ConditionalOnProperty(
14+
value = "amqp.configuration.current",
15+
havingValue = "simple-dlq")
16+
public class SimpleDLQAmqpConfiguration {
17+
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
18+
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
19+
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";
20+
21+
@Bean
22+
Queue messagesQueue() {
23+
return QueueBuilder.durable(QUEUE_MESSAGES)
24+
.withArgument("x-dead-letter-exchange", "")
25+
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
26+
.build();
27+
}
28+
29+
@Bean
30+
Queue deadLetterQueue() {
31+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
32+
}
33+
34+
@Bean
35+
DirectExchange messagesExchange() {
36+
return new DirectExchange(EXCHANGE_MESSAGES);
37+
}
38+
39+
@Bean
40+
Binding bindingMessages() {
41+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
42+
}
43+
}

0 commit comments

Comments
 (0)