Skip to content

Commit dba2621

Browse files
committed
[ISSUE #8438] Fix broker return two messages when query message and index service bug
1 parent 6e6319f commit dba2621

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

+3
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,9 @@ public synchronized void shutdown() {
460460
if (flatFileStore != null) {
461461
flatFileStore.shutdown();
462462
}
463+
if (indexService != null) {
464+
indexService.shutdown();
465+
}
463466
if (storeExecutor != null) {
464467
storeExecutor.shutdown();
465468
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public void recoverFileSize() {
9090
public void initOffset(long offset) {
9191
if (this.fileSegmentTable.isEmpty()) {
9292
FileSegment fileSegment = fileSegmentFactory.createSegment(fileType, filePath, offset);
93+
fileSegment.initPosition(fileSegment.getSize());
9394
this.flushFileSegmentMeta(fileSegment);
9495
this.fileSegmentTable.add(fileSegment);
9596
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,12 @@ protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
287287
buffer.position(this.getItemPosition(slotValue));
288288
buffer.get(bytes);
289289
IndexItem indexItem = new IndexItem(bytes);
290+
long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
290291
if (hashCode == indexItem.getHashCode()) {
291-
result.add(indexItem);
292+
if (hashCode == indexItem.getHashCode() &&
293+
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
294+
result.add(indexItem);
295+
}
292296
if (result.size() > maxCount) {
293297
break;
294298
}

0 commit comments

Comments
 (0)