Skip to content

Commit 04e97de

Browse files
committed
[ROCKETMQ-332] Fix concurrent bug in MappedFileQueue#findMappedFileByOffset, which may cause message loss
1 parent d849e0a commit 04e97de

File tree

2 files changed

+43
-13
lines changed

2 files changed

+43
-13
lines changed

store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,26 +461,38 @@ public boolean commit(final int commitLeastPages) {
461461
*/
462462
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
463463
try {
464-
MappedFile mappedFile = this.getFirstMappedFile();
465-
if (mappedFile != null) {
466-
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
464+
MappedFile firstMappedFile = this.getFirstMappedFile();
465+
if (firstMappedFile != null) {
466+
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
467467
if (index < 0 || index >= this.mappedFiles.size()) {
468-
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
469-
"mappedFileSize: {}, mappedFiles count: {}",
470-
mappedFile,
468+
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}",
469+
firstMappedFile,
471470
offset,
472471
index,
473472
this.mappedFileSize,
474473
this.mappedFiles.size());
475-
}
474+
} else {
475+
MappedFile targetFile = null;
476+
try {
477+
targetFile = this.mappedFiles.get(index);
478+
} catch (Exception ignored) {
479+
}
476480

477-
try {
478-
return this.mappedFiles.get(index);
479-
} catch (Exception e) {
480-
if (returnFirstOnNotFound) {
481-
return mappedFile;
481+
if (targetFile != null && offset >= targetFile.getFileFromOffset()
482+
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
483+
return targetFile;
482484
}
483-
LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
485+
486+
for (MappedFile tmpMappedFile : this.mappedFiles) {
487+
if (offset >= tmpMappedFile.getFileFromOffset()
488+
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
489+
return tmpMappedFile;
490+
}
491+
}
492+
}
493+
494+
if (returnFirstOnNotFound) {
495+
return firstMappedFile;
484496
}
485497
}
486498
} catch (Exception e) {

store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,24 @@ public void testDeleteExpiredFileByTime() throws Exception {
229229
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
230230
}
231231

232+
@Test
233+
public void testFindMappedFile_ByIteration() {
234+
MappedFileQueue mappedFileQueue =
235+
new MappedFileQueue("target/unit_test_store/g/", 1024, null);
236+
for (int i =0 ; i < 3; i++) {
237+
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024 * i);
238+
mappedFile.wrotePosition.set(1024);
239+
}
240+
241+
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
242+
243+
// Switch two MappedFiles and verify findMappedFileByOffset method
244+
MappedFile tmpFile = mappedFileQueue.getMappedFiles().get(1);
245+
mappedFileQueue.getMappedFiles().set(1, mappedFileQueue.getMappedFiles().get(2));
246+
mappedFileQueue.getMappedFiles().set(2, tmpFile);
247+
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
248+
}
249+
232250
@After
233251
public void destory() {
234252
File file = new File("target/unit_test_store");

0 commit comments

Comments
 (0)