Skip to content

Commit bf848c1

Browse files
authored
Check if pull message service has shutdown before scheduling pull requests (apache#277)
1 parent d035ccb commit bf848c1

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,16 @@ public PullMessageService(MQClientInstance mQClientFactory) {
4444
}
4545

4646
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
47-
this.scheduledExecutorService.schedule(new Runnable() {
48-
49-
@Override
50-
public void run() {
51-
PullMessageService.this.executePullRequestImmediately(pullRequest);
52-
}
53-
}, timeDelay, TimeUnit.MILLISECONDS);
47+
if (!isStopped()) {
48+
this.scheduledExecutorService.schedule(new Runnable() {
49+
@Override
50+
public void run() {
51+
PullMessageService.this.executePullRequestImmediately(pullRequest);
52+
}
53+
}, timeDelay, TimeUnit.MILLISECONDS);
54+
} else {
55+
log.warn("PullMessageServiceScheduledThread has shutdown");
56+
}
5457
}
5558

5659
public void executePullRequestImmediately(final PullRequest pullRequest) {
@@ -62,7 +65,11 @@ public void executePullRequestImmediately(final PullRequest pullRequest) {
6265
}
6366

6467
public void executeTaskLater(final Runnable r, final long timeDelay) {
65-
this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
68+
if (!isStopped()) {
69+
this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
70+
} else {
71+
log.warn("PullMessageServiceScheduledThread has shutdown");
72+
}
6673
}
6774

6875
public ScheduledExecutorService getScheduledExecutorService() {
@@ -86,10 +93,8 @@ public void run() {
8693
while (!this.isStopped()) {
8794
try {
8895
PullRequest pullRequest = this.pullRequestQueue.take();
89-
if (pullRequest != null) {
90-
this.pullMessage(pullRequest);
91-
}
92-
} catch (InterruptedException e) {
96+
this.pullMessage(pullRequest);
97+
} catch (InterruptedException ignored) {
9398
} catch (Exception e) {
9499
log.error("Pull Message Service Run Method exception", e);
95100
}

0 commit comments

Comments
 (0)