18
18
19
19
package org .skywalking .apm .collector .queue .disruptor ;
20
20
21
+ import com .lmax .disruptor .ExceptionHandler ;
21
22
import com .lmax .disruptor .RingBuffer ;
22
23
import com .lmax .disruptor .dsl .Disruptor ;
23
24
import org .skywalking .apm .collector .core .queue .DaemonThreadFactory ;
24
25
import org .skywalking .apm .collector .core .queue .MessageHolder ;
25
26
import org .skywalking .apm .collector .core .queue .QueueCreator ;
26
27
import org .skywalking .apm .collector .core .queue .QueueEventHandler ;
27
28
import org .skywalking .apm .collector .core .queue .QueueExecutor ;
29
+ import org .slf4j .Logger ;
30
+ import org .slf4j .LoggerFactory ;
28
31
29
32
/**
30
33
* @author peng-yongsheng
31
34
*/
32
35
public class DisruptorQueueCreator implements QueueCreator {
33
36
37
+ private final Logger logger = LoggerFactory .getLogger (DisruptorQueueCreator .class );
38
+
34
39
@ Override public QueueEventHandler create (int queueSize , QueueExecutor executor ) {
35
40
// Specify the size of the ring buffer, must be power of 2.
36
41
if (!((((queueSize - 1 ) & queueSize ) == 0 ) && queueSize != 0 )) {
@@ -40,6 +45,20 @@ public class DisruptorQueueCreator implements QueueCreator {
40
45
// Construct the Disruptor
41
46
Disruptor <MessageHolder > disruptor = new Disruptor (MessageHolderFactory .INSTANCE , queueSize , DaemonThreadFactory .INSTANCE );
42
47
48
+ disruptor .setDefaultExceptionHandler (new ExceptionHandler <MessageHolder >() {
49
+ @ Override public void handleEventException (Throwable ex , long sequence , MessageHolder event ) {
50
+ logger .error ("handler message error! message: {}." , event .getMessage (), ex );
51
+ }
52
+
53
+ @ Override public void handleOnStartException (Throwable ex ) {
54
+ logger .error ("create disruptor failed!" , ex );
55
+ }
56
+
57
+ @ Override public void handleOnShutdownException (Throwable ex ) {
58
+ logger .error ("shutdown disruptor failed!" , ex );
59
+ }
60
+ });
61
+
43
62
RingBuffer <MessageHolder > ringBuffer = disruptor .getRingBuffer ();
44
63
DisruptorEventHandler eventHandler = new DisruptorEventHandler (ringBuffer , executor );
45
64
0 commit comments