Skip to content

Commit 812a9ca

Browse files
committed
Fix PSYNC2 bug.
1 parent 697f331 commit 812a9ca

File tree

2 files changed

+50
-24
lines changed

2 files changed

+50
-24
lines changed

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/Configuration.java

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public static Configuration defaultSetting() {
132132
*/
133133
private String replId = "?";
134134

135+
/**
136+
* psync2 repl_stream_db
137+
*/
138+
private int replStreamDB = -1;
139+
135140
/**
136141
* psync offset
137142
*/
@@ -218,6 +223,15 @@ public Configuration setReplId(String replId) {
218223
return this;
219224
}
220225

226+
public int getReplStreamDB() {
227+
return replStreamDB;
228+
}
229+
230+
public Configuration setReplStreamDB(int replStreamDB) {
231+
this.replStreamDB = replStreamDB;
232+
return this;
233+
}
234+
221235
public long getReplOffset() {
222236
return replOffset.get();
223237
}
@@ -326,86 +340,89 @@ public static Configuration valueOf(RedisURI uri) {
326340
Configuration configuration = defaultSetting();
327341
Map<String, String> parameters = uri.parameters;
328342
if (parameters.containsKey("connectionTimeout")) {
329-
configuration.setConnectionTimeout(getInt(parameters.get("connectionTimeout")));
343+
configuration.setConnectionTimeout(getInt(parameters.get("connectionTimeout"), 30000));
330344
}
331345
if (parameters.containsKey("readTimeout")) {
332-
configuration.setReadTimeout(getInt(parameters.get("readTimeout")));
346+
configuration.setReadTimeout(getInt(parameters.get("readTimeout"), 30000));
333347
}
334348
if (parameters.containsKey("receiveBufferSize")) {
335-
configuration.setReceiveBufferSize(getInt(parameters.get("receiveBufferSize")));
349+
configuration.setReceiveBufferSize(getInt(parameters.get("receiveBufferSize"), 0));
336350
}
337351
if (parameters.containsKey("sendBufferSize")) {
338-
configuration.setSendBufferSize(getInt(parameters.get("sendBufferSize")));
352+
configuration.setSendBufferSize(getInt(parameters.get("sendBufferSize"), 0));
339353
}
340354
if (parameters.containsKey("retries")) {
341-
configuration.setRetries(getInt(parameters.get("retries")));
355+
configuration.setRetries(getInt(parameters.get("retries"), 5));
342356
}
343357
if (parameters.containsKey("retryTimeInterval")) {
344-
configuration.setRetryTimeInterval(getInt(parameters.get("retryTimeInterval")));
358+
configuration.setRetryTimeInterval(getInt(parameters.get("retryTimeInterval"), 1000));
345359
}
346360
if (parameters.containsKey("bufferSize")) {
347-
configuration.setBufferSize(getInt(parameters.get("bufferSize")));
361+
configuration.setBufferSize(getInt(parameters.get("bufferSize"), 8 * 1024));
348362
}
349363
if (parameters.containsKey("authPassword")) {
350364
configuration.setAuthPassword(parameters.get("authPassword"));
351365
}
352366
if (parameters.containsKey("discardRdbEvent")) {
353-
configuration.setDiscardRdbEvent(getBool(parameters.get("discardRdbEvent")));
367+
configuration.setDiscardRdbEvent(getBool(parameters.get("discardRdbEvent"), false));
354368
}
355369
if (parameters.containsKey("asyncCachedBytes")) {
356-
configuration.setAsyncCachedBytes(getInt(parameters.get("asyncCachedBytes")));
370+
configuration.setAsyncCachedBytes(getInt(parameters.get("asyncCachedBytes"), 512 * 1024));
357371
}
358372
if (parameters.containsKey("rateLimit")) {
359-
configuration.setRateLimit(getInt(parameters.get("rateLimit")));
373+
configuration.setRateLimit(getInt(parameters.get("rateLimit"), 0));
360374
}
361375
if (parameters.containsKey("verbose")) {
362-
configuration.setVerbose(getBool(parameters.get("verbose")));
376+
configuration.setVerbose(getBool(parameters.get("verbose"), false));
363377
}
364378
if (parameters.containsKey("heartBeatPeriod")) {
365-
configuration.setHeartBeatPeriod(getInt(parameters.get("heartBeatPeriod")));
379+
configuration.setHeartBeatPeriod(getInt(parameters.get("heartBeatPeriod"), 1000));
366380
}
367381
if (parameters.containsKey("useDefaultExceptionListener")) {
368-
configuration.setUseDefaultExceptionListener(getBool(parameters.get("useDefaultExceptionListener")));
382+
configuration.setUseDefaultExceptionListener(getBool(parameters.get("useDefaultExceptionListener"), false));
369383
}
370384
if (parameters.containsKey("ssl")) {
371-
configuration.setSsl(getBool(parameters.get("ssl")));
385+
configuration.setSsl(getBool(parameters.get("ssl"), false));
372386
}
373387
if (parameters.containsKey("replId")) {
374388
configuration.setReplId(parameters.get("replId"));
375389
}
390+
if (parameters.containsKey("replStreamDB")) {
391+
configuration.setReplStreamDB(getInt(parameters.get("replStreamDB"), -1));
392+
}
376393
if (parameters.containsKey("replOffset")) {
377-
configuration.setReplOffset(getLong(parameters.get("replOffset")));
394+
configuration.setReplOffset(getLong(parameters.get("replOffset"), -1L));
378395
}
379396
return configuration;
380397
}
381398

382-
private static boolean getBool(String value) {
399+
private static boolean getBool(String value, boolean defaultValue) {
383400
if (value == null)
384-
return false;
401+
return defaultValue;
385402
if (value.equals("false") || value.equals("no"))
386403
return false;
387404
if (value.equals("true") || value.equals("yes"))
388405
return true;
389-
return false;
406+
return defaultValue;
390407
}
391408

392-
private static int getInt(String value) {
409+
private static int getInt(String value, int defaultValue) {
393410
if (value == null)
394-
return 0;
411+
return defaultValue;
395412
try {
396413
return Integer.parseInt(value);
397414
} catch (NumberFormatException e) {
398-
return 0;
415+
return defaultValue;
399416
}
400417
}
401418

402-
private static long getLong(String value) {
419+
private static long getLong(String value, long defaultValue) {
403420
if (value == null)
404-
return 0L;
421+
return defaultValue;
405422
try {
406423
return Long.parseLong(value);
407424
} catch (NumberFormatException e) {
408-
return 0L;
425+
return defaultValue;
409426
}
410427
}
411428

@@ -431,6 +448,7 @@ public String toString() {
431448
", sslParameters=" + sslParameters +
432449
", hostnameVerifier=" + hostnameVerifier +
433450
", replId='" + replId + '\'' +
451+
", replStreamDB=" + replStreamDB +
434452
", replOffset=" + replOffset +
435453
'}';
436454
}

rocketmq-redis/src/main/java/org/apache/rocketmq/redis/replicator/rdb/DefaultRdbVisitor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
5050

51+
import static java.lang.Integer.parseInt;
52+
import static java.lang.Long.parseLong;
5153
import static java.nio.charset.StandardCharsets.UTF_8;
5254

5355
public class DefaultRdbVisitor extends RdbVisitor {
@@ -168,6 +170,12 @@ public Event applyAux(RedisInputStream in, int version) throws IOException {
168170
if (LOGGER.isInfoEnabled()) {
169171
LOGGER.info("RDB " + auxKey + ": " + auxValue);
170172
}
173+
if (auxKey.equals("repl-id"))
174+
replicator.getConfiguration().setReplId(auxValue);
175+
if (auxKey.equals("repl-offset"))
176+
replicator.getConfiguration().setReplOffset(parseLong(auxValue));
177+
if (auxKey.equals("repl-stream-db"))
178+
replicator.getConfiguration().setReplStreamDB(parseInt(auxValue));
171179
return new AuxField(auxKey, auxValue);
172180
} else {
173181
if (LOGGER.isWarnEnabled()) {

0 commit comments

Comments
 (0)