Skip to content

Commit 578bc49

Browse files
committed
fix multi thread close bug
1 parent 67b4ea8 commit 578bc49

File tree

5 files changed

+17
-28
lines changed

5 files changed

+17
-28
lines changed

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/AbstractReplicator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,13 @@ public void builtInCommandParserRegister() {
275275
addCommandParser(CommandName.name("RPOPLPUSH"), new RPopLPushParser());
276276
}
277277

278+
@Override
279+
public void close() throws IOException {
280+
this.connected.compareAndSet(CONNECTED, DISCONNECTING);
281+
}
282+
278283
protected void doClose() throws IOException {
279-
if (!this.connected.compareAndSet(CONNECTED, DISCONNECTING)) return;
284+
this.connected.compareAndSet(CONNECTED, DISCONNECTING);
280285
try {
281286
if (inputStream != null) {
282287
this.inputStream.setRawByteListeners(null);
@@ -287,6 +292,5 @@ protected void doClose() throws IOException {
287292
} finally {
288293
this.connected.set(DISCONNECTED);
289294
}
290-
doCloseListener(this);
291295
}
292296
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisAofReplicator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void open() throws IOException {
6868
if (!(e.getCause() instanceof EOFException))
6969
throw e.getCause();
7070
} finally {
71-
close();
71+
doClose();
72+
doCloseListener(this);
7273
}
7374
}
7475

@@ -97,10 +98,4 @@ protected void doOpen() throws IOException {
9798
}
9899
}
99100
}
100-
101-
@Override
102-
public void close() throws IOException {
103-
doClose();
104-
}
105-
106101
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisMixReplicator.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public void open() throws IOException {
7575
if (!(e.getCause() instanceof EOFException))
7676
throw e.getCause();
7777
} finally {
78-
close();
78+
doClose();
79+
doCloseListener(this);
7980
}
8081
}
8182

@@ -107,9 +108,4 @@ protected void doOpen() throws IOException {
107108
}
108109
}
109110
}
110-
111-
@Override
112-
public void close() throws IOException {
113-
doClose();
114-
}
115111
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisRdbReplicator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,13 @@ public void open() throws IOException {
5656
if (!(e.getCause() instanceof EOFException))
5757
throw e.getCause();
5858
} finally {
59-
close();
59+
doClose();
60+
doCloseListener(this);
6061
}
6162
}
6263

6364
protected void doOpen() throws IOException {
6465
RdbParser parser = new RdbParser(inputStream, this);
6566
parser.parse();
6667
}
67-
68-
@Override
69-
public void close() throws IOException {
70-
doClose();
71-
}
72-
7368
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisSocketReplicator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void open() throws IOException {
8383
try {
8484
doOpen();
8585
} finally {
86-
close();
86+
doClose();
8787
doCloseListener(this);
8888
}
8989
}
@@ -112,7 +112,7 @@ protected void doOpen() throws IOException {
112112
heartbeat();
113113
} else if (syncMode == SyncMode.SYNC_LATER && getStatus() == CONNECTED) {
114114
i = 0;
115-
close();
115+
doClose();
116116
try {
117117
Thread.sleep(configuration.getRetryTimeInterval());
118118
} catch (InterruptedException interrupt) {
@@ -167,7 +167,7 @@ public void handle(long len) {
167167
exception = (IOException) e;
168168
}
169169
LOGGER.error("[redis-replicator] socket error", exception);
170-
close();
170+
doClose();
171171
//retry psync in next loop.
172172
if (LOGGER.isInfoEnabled()) {
173173
LOGGER.info("reconnect to redis-server. retry times:" + (i + 1));
@@ -404,9 +404,8 @@ protected void connect() throws IOException {
404404
}
405405

406406
@Override
407-
public void close() {
408-
if (!connected.compareAndSet(CONNECTED, DISCONNECTING))
409-
return;
407+
protected void doClose() throws IOException {
408+
connected.compareAndSet(CONNECTED, DISCONNECTING);
410409

411410
try {
412411
synchronized (this) {

0 commit comments

Comments
 (0)