Skip to content

Commit b391594

Browse files
authored
Merge pull request apache#220 from zhouxinyu/ROCKETMQ-332
[ROCKETMQ-332] MappedFileQueue#findMappedFileByOffset is not thread safe, which will cause message loss.
2 parents a096580 + 9151a32 commit b391594

File tree

2 files changed

+46
-15
lines changed

2 files changed

+46
-15
lines changed

store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -461,26 +461,39 @@ public boolean commit(final int commitLeastPages) {
461461
*/
462462
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
463463
try {
464-
MappedFile mappedFile = this.getFirstMappedFile();
465-
if (mappedFile != null) {
466-
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
467-
if (index < 0 || index >= this.mappedFiles.size()) {
468-
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
469-
"mappedFileSize: {}, mappedFiles count: {}",
470-
mappedFile,
464+
MappedFile firstMappedFile = this.getFirstMappedFile();
465+
MappedFile lastMappedFile = this.getLastMappedFile();
466+
if (firstMappedFile != null && lastMappedFile != null) {
467+
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
468+
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
471469
offset,
472-
index,
470+
firstMappedFile.getFileFromOffset(),
471+
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
473472
this.mappedFileSize,
474473
this.mappedFiles.size());
475-
}
474+
} else {
475+
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
476+
MappedFile targetFile = null;
477+
try {
478+
targetFile = this.mappedFiles.get(index);
479+
} catch (Exception ignored) {
480+
}
476481

477-
try {
478-
return this.mappedFiles.get(index);
479-
} catch (Exception e) {
480-
if (returnFirstOnNotFound) {
481-
return mappedFile;
482+
if (targetFile != null && offset >= targetFile.getFileFromOffset()
483+
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
484+
return targetFile;
482485
}
483-
LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
486+
487+
for (MappedFile tmpMappedFile : this.mappedFiles) {
488+
if (offset >= tmpMappedFile.getFileFromOffset()
489+
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
490+
return tmpMappedFile;
491+
}
492+
}
493+
}
494+
495+
if (returnFirstOnNotFound) {
496+
return firstMappedFile;
484497
}
485498
}
486499
} catch (Exception e) {

store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,24 @@ public void testDeleteExpiredFileByTime() throws Exception {
229229
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
230230
}
231231

232+
@Test
233+
public void testFindMappedFile_ByIteration() {
234+
MappedFileQueue mappedFileQueue =
235+
new MappedFileQueue("target/unit_test_store/g/", 1024, null);
236+
for (int i =0 ; i < 3; i++) {
237+
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024 * i);
238+
mappedFile.wrotePosition.set(1024);
239+
}
240+
241+
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
242+
243+
// Switch two MappedFiles and verify findMappedFileByOffset method
244+
MappedFile tmpFile = mappedFileQueue.getMappedFiles().get(1);
245+
mappedFileQueue.getMappedFiles().set(1, mappedFileQueue.getMappedFiles().get(2));
246+
mappedFileQueue.getMappedFiles().set(2, tmpFile);
247+
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
248+
}
249+
232250
@After
233251
public void destory() {
234252
File file = new File("target/unit_test_store");

0 commit comments

Comments
 (0)