Skip to content

Commit e75c892

Browse files
chenby@nextop.cnleonchen83
authored andcommitted
travis ci
1 parent e1a8862 commit e75c892

24 files changed

+792
-69
lines changed

.travis.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
language: java
2+
jdk:
3+
- oraclejdk8
4+
branches:
5+
only:
6+
- master
7+
env:
8+
- TEST_DIR=rocketmq-redis
9+
script: "cd $TEST_DIR && sh INTEGRATION-TEST"

rocketmq-redis/INTEGRATION-TEST

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/env bash
2+
set -ev
3+
export REDIS_REPLICATOR_HOME=`pwd`
4+
cd ..
5+
wget https://github.com/antirez/redis/archive/3.2.3.tar.gz
6+
tar -xvzf 3.2.3.tar.gz
7+
cd redis-3.2.3
8+
make
9+
cd src
10+
nohup ./redis-server --port 6379 &
11+
sleep 1
12+
cd $REDIS_REPLICATOR_HOME
13+
mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
14+
mvn clean install -DskipITs

rocketmq-redis/pom.xml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
<zookeeper.version>3.4.6</zookeeper.version>
8383
<curator.version>2.9.1</curator.version>
8484
<logback.version>1.1.3</logback.version>
85-
<rocketmq.client.version>4.0.0-incubating</rocketmq.client.version>
85+
<rocketmq.client.version>4.1.0-incubating</rocketmq.client.version>
8686
</properties>
8787

8888
<dependencies>
@@ -134,6 +134,12 @@
134134
<version>2.9.0</version>
135135
<scope>test</scope>
136136
</dependency>
137+
<dependency>
138+
<groupId>org.apache.rocketmq</groupId>
139+
<artifactId>rocketmq-test</artifactId>
140+
<version>4.1.0-incubating</version>
141+
<scope>test</scope>
142+
</dependency>
137143
</dependencies>
138144

139145
<build>
@@ -230,12 +236,10 @@
230236
<exclude>CHANGELOG.md</exclude>
231237
<exclude>CONTRIBUTING.md</exclude>
232238
<exclude>NOTICE-BIN</exclude>
239+
<exclude>INTEGRATION-TEST</exclude>
233240
<exclude>src/main/assembly/scripts/start.sh</exclude>
234241
<exclude>src/main/assembly/scripts/stop.sh</exclude>
235242
<exclude>src/main/resources/replicator.conf</exclude>
236-
<exclude>src/test/java/org/apache/rocketmq/redis/replicator/cmd/parser/LTrimParserTest.java</exclude>
237-
<exclude>src/test/java/org/apache/rocketmq/redis/replicator/cmd/parser/RPopLPushParserTest.java</exclude>
238-
<exclude>src/test/java/org/apache/rocketmq/redis/replicator/cmd/parser/SortParserTest.java</exclude>
239243
<exclude>src/test/resources/appendonly1.aof</exclude>
240244
<exclude>src/test/resources/appendonly2.aof</exclude>
241245
<exclude>src/test/resources/appendonly3.aof</exclude>

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/AbstractReplicator.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.io.IOException;
2121
import java.util.Map;
2222
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
2325
import org.apache.rocketmq.redis.replicator.cmd.Command;
2426
import org.apache.rocketmq.redis.replicator.cmd.CommandName;
2527
import org.apache.rocketmq.redis.replicator.cmd.CommandParser;
@@ -108,10 +110,15 @@
108110
import org.apache.rocketmq.redis.replicator.rdb.DefaultRdbVisitor;
109111
import org.apache.rocketmq.redis.replicator.rdb.datatype.Module;
110112

113+
import static org.apache.rocketmq.redis.replicator.Status.CONNECTED;
114+
import static org.apache.rocketmq.redis.replicator.Status.DISCONNECTED;
115+
import static org.apache.rocketmq.redis.replicator.Status.DISCONNECTING;
116+
111117
public abstract class AbstractReplicator extends AbstractReplicatorListener implements Replicator {
112118
protected Configuration configuration;
113119
protected volatile RedisInputStream inputStream;
114120
protected RdbVisitor rdbVisitor = new DefaultRdbVisitor(this);
121+
protected final AtomicReference<Status> connected = new AtomicReference<>(DISCONNECTED);
115122
protected final Map<ModuleKey, ModuleParser<? extends Module>> modules = new ConcurrentHashMap<>();
116123
protected final Map<CommandName, CommandParser<? extends Command>> commands = new ConcurrentHashMap<>();
117124

@@ -171,6 +178,11 @@ public boolean verbose() {
171178
return configuration != null && configuration.isVerbose();
172179
}
173180

181+
@Override
182+
public Status getStatus() {
183+
return connected.get();
184+
}
185+
174186
@Override
175187
public Configuration getConfiguration() {
176188
return configuration;
@@ -264,11 +276,16 @@ public void builtInCommandParserRegister() {
264276
}
265277

266278
protected void doClose() throws IOException {
267-
if (inputStream != null) {
268-
try {
279+
if (!this.connected.compareAndSet(CONNECTED, DISCONNECTING)) return;
280+
try {
281+
if (inputStream != null) {
269282
this.inputStream.setRawByteListeners(null);
270283
inputStream.close();
271-
} catch (IOException ignore) { /*NOP*/ }
284+
}
285+
} catch (IOException ignore) {
286+
/*NOP*/
287+
} finally {
288+
this.connected.set(DISCONNECTED);
272289
}
273290
doCloseListener(this);
274291
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/FileType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
public enum FileType {
2121
AOF, RDB, MIXED;
2222

23-
public static FileType parse(String type) {
23+
static FileType parse(String type) {
2424
if (type == null) {
2525
return null;
2626
} else if (type.equalsIgnoreCase("aof")) {

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisAofReplicator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.slf4j.LoggerFactory;
3535

3636
import static java.nio.charset.StandardCharsets.UTF_8;
37+
import static org.apache.rocketmq.redis.replicator.Status.CONNECTED;
38+
import static org.apache.rocketmq.redis.replicator.Status.DISCONNECTED;
3739

3840
public class RedisAofReplicator extends AbstractReplicator {
3941

@@ -58,6 +60,7 @@ public RedisAofReplicator(InputStream in, Configuration configuration) {
5860

5961
@Override
6062
public void open() throws IOException {
63+
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
6164
try {
6265
doOpen();
6366
} catch (EOFException ignore) {
@@ -70,7 +73,7 @@ public void open() throws IOException {
7073
}
7174

7275
protected void doOpen() throws IOException {
73-
while (true) {
76+
while (getStatus() == CONNECTED) {
7477
// got EOFException to break the loop
7578
Object obj = replyParser.parse();
7679

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisMixReplicator.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.rocketmq.redis.replicator.cmd.Command;
2828
import org.apache.rocketmq.redis.replicator.cmd.CommandName;
2929
import org.apache.rocketmq.redis.replicator.cmd.CommandParser;
30+
import org.apache.rocketmq.redis.replicator.io.PeekableInputStream;
3031
import org.apache.rocketmq.redis.replicator.io.RedisInputStream;
3132
import org.apache.rocketmq.redis.replicator.rdb.RdbParser;
3233
import org.apache.rocketmq.redis.replicator.util.Arrays;
@@ -35,10 +36,13 @@
3536
import org.slf4j.LoggerFactory;
3637

3738
import static java.nio.charset.StandardCharsets.UTF_8;
39+
import static org.apache.rocketmq.redis.replicator.Status.CONNECTED;
40+
import static org.apache.rocketmq.redis.replicator.Status.DISCONNECTED;
3841

3942
public class RedisMixReplicator extends AbstractReplicator {
4043
protected static final Logger LOGGER = LoggerFactory.getLogger(RedisAofReplicator.class);
4144
protected final ReplyParser replyParser;
45+
protected final PeekableInputStream peekable;
4246

4347
public RedisMixReplicator(File file, Configuration configuration) throws FileNotFoundException {
4448
this(new FileInputStream(file), configuration);
@@ -48,6 +52,11 @@ public RedisMixReplicator(InputStream in, Configuration configuration) {
4852
Objects.requireNonNull(in);
4953
Objects.requireNonNull(configuration);
5054
this.configuration = configuration;
55+
if (in instanceof PeekableInputStream) {
56+
this.peekable = (PeekableInputStream) in;
57+
} else {
58+
in = this.peekable = new PeekableInputStream(in);
59+
}
5160
this.inputStream = new RedisInputStream(in, this.configuration.getBufferSize());
5261
this.inputStream.setRawByteListeners(this.rawByteListeners);
5362
this.replyParser = new ReplyParser(inputStream);
@@ -58,6 +67,7 @@ public RedisMixReplicator(InputStream in, Configuration configuration) {
5867

5968
@Override
6069
public void open() throws IOException {
70+
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
6171
try {
6272
doOpen();
6373
} catch (EOFException ignore) {
@@ -70,9 +80,11 @@ public void open() throws IOException {
7080
}
7181

7282
protected void doOpen() throws IOException {
73-
RdbParser parser = new RdbParser(inputStream, this);
74-
parser.parse();
75-
while (true) {
83+
if (peekable.peek() == 'R') {
84+
RdbParser parser = new RdbParser(inputStream, this);
85+
parser.parse();
86+
}
87+
while (getStatus() == CONNECTED) {
7688
// got EOFException to break the loop
7789
Object obj = replyParser.parse();
7890
if (obj instanceof Object[]) {

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisRdbReplicator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.rocketmq.redis.replicator.io.RedisInputStream;
2828
import org.apache.rocketmq.redis.replicator.rdb.RdbParser;
2929

30+
import static org.apache.rocketmq.redis.replicator.Status.CONNECTED;
31+
import static org.apache.rocketmq.redis.replicator.Status.DISCONNECTED;
32+
3033
public class RedisRdbReplicator extends AbstractReplicator {
3134

3235
public RedisRdbReplicator(File file, Configuration configuration) throws FileNotFoundException {
@@ -45,6 +48,7 @@ public RedisRdbReplicator(InputStream in, Configuration configuration) {
4548

4649
@Override
4750
public void open() throws IOException {
51+
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
4852
try {
4953
doOpen();
5054
} catch (EOFException ignore) {

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ public boolean verbose() {
201201
return replicator.verbose();
202202
}
203203

204+
@Override
205+
public Status getStatus() {
206+
return replicator.getStatus();
207+
}
208+
204209
@Override
205210
public Configuration getConfiguration() {
206211
return replicator.getConfiguration();

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/RedisSocketReplicator.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Objects;
2424
import java.util.Timer;
2525
import java.util.TimerTask;
26-
import java.util.concurrent.atomic.AtomicReference;
2726
import org.apache.rocketmq.redis.replicator.cmd.OffsetHandler;
2827
import org.apache.rocketmq.redis.replicator.io.RedisInputStream;
2928
import org.apache.rocketmq.redis.replicator.net.RedisSocketFactory;
@@ -59,7 +58,6 @@ public class RedisSocketReplicator extends AbstractReplicator {
5958
protected volatile ReplyParser replyParser;
6059
protected volatile RedisOutputStream outputStream;
6160
protected final RedisSocketFactory socketFactory;
62-
protected final AtomicReference<Status> connected = new AtomicReference<>(DISCONNECTED);
6361

6462
public RedisSocketReplicator(String host, int port, Configuration configuration) {
6563
Objects.requireNonNull(host);
@@ -109,10 +107,10 @@ protected void doOpen() throws IOException {
109107

110108
SyncMode syncMode = trySync(reply);
111109
//bug fix.
112-
if (syncMode == SyncMode.PSYNC && connected.get() == CONNECTED) {
110+
if (syncMode == SyncMode.PSYNC && getStatus() == CONNECTED) {
113111
//heartbeat send REPLCONF ACK ${slave offset}
114112
heartbeat();
115-
} else if (syncMode == SyncMode.SYNC_LATER && connected.get() == CONNECTED) {
113+
} else if (syncMode == SyncMode.SYNC_LATER && getStatus() == CONNECTED) {
116114
//sync later
117115
i = 0;
118116
close();
@@ -125,7 +123,7 @@ protected void doOpen() throws IOException {
125123
}
126124
//sync command
127125
final long[] offset = new long[1];
128-
while (connected.get() == CONNECTED) {
126+
while (getStatus() == CONNECTED) {
129127
Object obj = replyParser.parse(new OffsetHandler() {
130128
@Override
131129
public void handle(long len) {
@@ -163,7 +161,7 @@ public void handle(long len) {
163161
break;
164162
} catch (IOException | UncheckedIOException e) {
165163
//close socket manual
166-
if (connected.get() != CONNECTED)
164+
if (getStatus() != CONNECTED)
167165
break;
168166
LOGGER.error("[redis-replicator] socket error", e);
169167
//connect refused,connect timeout,read timeout,connect abort,server disconnect,connection EOFException
@@ -402,13 +400,6 @@ protected void connect() throws IOException {
402400
}
403401
}
404402

405-
/**
406-
* @return connection status
407-
*/
408-
public Status getStatus() {
409-
return connected.get();
410-
}
411-
412403
@Override
413404
public void close() {
414405
if (!connected.compareAndSet(CONNECTED, DISCONNECTING))

0 commit comments

Comments
 (0)