Skip to content

Commit b153a48

Browse files
authored
Merge pull request apache#577 from ascrutae/fix/disruptor-exception-handle
add exception handler for disruptorQueue
2 parents 8472f5c + 99c11b5 commit b153a48

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/disruptor/DisruptorQueueCreator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818

1919
package org.skywalking.apm.collector.queue.disruptor;
2020

21+
import com.lmax.disruptor.ExceptionHandler;
2122
import com.lmax.disruptor.RingBuffer;
2223
import com.lmax.disruptor.dsl.Disruptor;
2324
import org.skywalking.apm.collector.core.queue.DaemonThreadFactory;
2425
import org.skywalking.apm.collector.core.queue.MessageHolder;
2526
import org.skywalking.apm.collector.core.queue.QueueCreator;
2627
import org.skywalking.apm.collector.core.queue.QueueEventHandler;
2728
import org.skywalking.apm.collector.core.queue.QueueExecutor;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
/**
3033
* @author peng-yongsheng
3134
*/
3235
public class DisruptorQueueCreator implements QueueCreator {
3336

37+
private final Logger logger = LoggerFactory.getLogger(DisruptorQueueCreator.class);
38+
3439
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
3540
// Specify the size of the ring buffer, must be power of 2.
3641
if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
@@ -40,6 +45,20 @@ public class DisruptorQueueCreator implements QueueCreator {
4045
// Construct the Disruptor
4146
Disruptor<MessageHolder> disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);
4247

48+
disruptor.setDefaultExceptionHandler(new ExceptionHandler<MessageHolder>() {
49+
@Override public void handleEventException(Throwable ex, long sequence, MessageHolder event) {
50+
logger.error("handler message error! message: {}.", event.getMessage(), ex);
51+
}
52+
53+
@Override public void handleOnStartException(Throwable ex) {
54+
logger.error("create disruptor failed!", ex);
55+
}
56+
57+
@Override public void handleOnShutdownException(Throwable ex) {
58+
logger.error("shutdown disruptor failed!", ex);
59+
}
60+
});
61+
4362
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
4463
DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);
4564

0 commit comments

Comments
 (0)