@@ -44,13 +44,16 @@ public PullMessageService(MQClientInstance mQClientFactory) {
44
44
}
45
45
46
46
public void executePullRequestLater (final PullRequest pullRequest , final long timeDelay ) {
47
- this .scheduledExecutorService .schedule (new Runnable () {
48
-
49
- @ Override
50
- public void run () {
51
- PullMessageService .this .executePullRequestImmediately (pullRequest );
52
- }
53
- }, timeDelay , TimeUnit .MILLISECONDS );
47
+ if (!isStopped ()) {
48
+ this .scheduledExecutorService .schedule (new Runnable () {
49
+ @ Override
50
+ public void run () {
51
+ PullMessageService .this .executePullRequestImmediately (pullRequest );
52
+ }
53
+ }, timeDelay , TimeUnit .MILLISECONDS );
54
+ } else {
55
+ log .warn ("PullMessageServiceScheduledThread has shutdown" );
56
+ }
54
57
}
55
58
56
59
public void executePullRequestImmediately (final PullRequest pullRequest ) {
@@ -62,7 +65,11 @@ public void executePullRequestImmediately(final PullRequest pullRequest) {
62
65
}
63
66
64
67
public void executeTaskLater (final Runnable r , final long timeDelay ) {
65
- this .scheduledExecutorService .schedule (r , timeDelay , TimeUnit .MILLISECONDS );
68
+ if (!isStopped ()) {
69
+ this .scheduledExecutorService .schedule (r , timeDelay , TimeUnit .MILLISECONDS );
70
+ } else {
71
+ log .warn ("PullMessageServiceScheduledThread has shutdown" );
72
+ }
66
73
}
67
74
68
75
public ScheduledExecutorService getScheduledExecutorService () {
@@ -86,10 +93,8 @@ public void run() {
86
93
while (!this .isStopped ()) {
87
94
try {
88
95
PullRequest pullRequest = this .pullRequestQueue .take ();
89
- if (pullRequest != null ) {
90
- this .pullMessage (pullRequest );
91
- }
92
- } catch (InterruptedException e ) {
96
+ this .pullMessage (pullRequest );
97
+ } catch (InterruptedException ignored ) {
93
98
} catch (Exception e ) {
94
99
log .error ("Pull Message Service Run Method exception" , e );
95
100
}
0 commit comments