diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 35c1d0e2d70..cc29cca5d94 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -314,6 +314,7 @@ public boolean getLastMappedFile(final long startOffset) { /** * When the normal exit, data recovery, all memory data have been flush + * * @throws RocksDBException only in rocksdb mode */ public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException { @@ -636,7 +637,8 @@ private void setBatchSizeIfNeeded(Map propertiesMap, DispatchReq public long getConfirmOffset() { if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) { if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) { - if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) { + if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1 + || !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { return this.defaultMessageStore.getMaxPhyOffset(); } // First time it will compute the confirmOffset. @@ -1214,7 +1216,7 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc } } catch (RocksDBException e) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } finally { + } finally { topicQueueLock.unlock(topicQueueKey); } @@ -1840,7 +1842,8 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { this.messageStoreConfig = messageStoreConfig; } - public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) { + public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, + final MessageExtBrokerInner msgInner) { if (msgInner.isEncodeCompleted()) { return null; } @@ -1850,10 +1853,10 @@ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 - && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; @@ -2312,7 +2315,7 @@ public boolean isDataInPageCache(final long offset) { return true; } - int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); + int pos = (int) (offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); int realIndex = pos / pageSize / sampleSteps; return bytes.length - 1 >= realIndex && bytes[realIndex] != 0; } @@ -2356,8 +2359,8 @@ private byte[] sampling(byte[] pageCacheTable, int sampleStep) { private byte[] checkFileInPageCache(MappedFile mappedFile) { long fileSize = mappedFile.getFileSize(); - final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address(); - int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize; + final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address(); + int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize; byte[] pageCacheRst = new byte[pageNums]; int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst); if (mincore != 0) { @@ -2395,7 +2398,7 @@ public boolean isMsgInColdArea(String group, String topic, int queueId, long off return false; } try { - ConsumeQueue consumeQueue = (ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId); + ConsumeQueue consumeQueue = (ConsumeQueue) defaultMessageStore.findConsumeQueue(topic, queueId); if (null == consumeQueue) { return false; } @@ -2433,7 +2436,7 @@ private int setFileReadMode(MappedFile mappedFile, int mode) { log.error("setFileReadMode mappedFile is null"); return -1; } - final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address(); + final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address(); int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode); if (madvise != 0) { log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);