From 0a9bba4a09b02f3d4f6e5c85bff0cd318b03a20f Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 12 Dec 2023 17:15:47 +0800 Subject: [PATCH] [ISSUE #7585] Always return duplicate buffer when filter and fix log format --- .../rocketmq/tieredstore/common/GetMessageResultExt.java | 5 ++--- .../rocketmq/tieredstore/provider/TieredFileSegment.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java index 52462b5dc5e..2e294c1c7dc 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; public class GetMessageResultExt extends GetMessageResult { @@ -63,9 +62,9 @@ public GetMessageResult doFilterMessage(MessageFilter messageFilter) { continue; } + long offset = this.getMessageQueueOffset().get(i); result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(), - bufferResult.getByteBuffer(), bufferResult.getSize(), null), - MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer())); + bufferResult.getByteBuffer().asReadOnlyBuffer(), bufferResult.getSize(), null), offset); } if (result.getBufferTotalSize() == 0) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java index 5e3d8c5624f..6703de9403f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java @@ -367,7 +367,7 @@ public CompletableFuture commitAsync() { if (fileSegmentInputStream != null) { long fileSize = this.getSize(); if (fileSize == -1L) { - logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", + logger.error("Get commit position error before commit, Commit: {}, Expect: {}, Current Max: {}, FileName: {}", commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); releaseCommitLock(); return CompletableFuture.completedFuture(false);