Skip to content

Commit 1b42515

Browse files
yuz10yuzhou
and
yuzhou
authored
[ISSUE #8129] Support topic reserved time in tiered storage (#8130)
Co-authored-by: yuzhou <yuzhou4@huawei.com>
1 parent 94bb64f commit 1b42515

File tree

7 files changed

+56
-6
lines changed

7 files changed

+56
-6
lines changed

broker/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ java_library(
2929
"//remoting",
3030
"//srvutil",
3131
"//store",
32+
"//tieredstore",
3233
"@maven//:ch_qos_logback_logback_classic",
3334
"@maven//:com_alibaba_fastjson",
3435
"@maven//:com_alibaba_fastjson2_fastjson2",
@@ -81,6 +82,7 @@ java_library(
8182
"//filter",
8283
"//remoting",
8384
"//store",
85+
"//tieredstore",
8486
"@maven//:com_alibaba_fastjson",
8587
"@maven//:com_alibaba_fastjson2_fastjson2",
8688
"@maven//:com_google_guava_guava",

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

+31
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
5252
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
5353
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
54+
import org.apache.rocketmq.tieredstore.TieredMessageStore;
55+
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
56+
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
5457

5558
import static com.google.common.base.Preconditions.checkNotNull;
5659

@@ -501,6 +504,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
501504
ImmutableMap.copyOf(newAttributes));
502505

503506
topicConfig.setAttributes(finalAttributes);
507+
updateTieredStoreTopicMetadata(topicConfig, newAttributes);
504508

505509
TopicConfig old = putTopicConfig(topicConfig);
506510
if (old != null) {
@@ -515,6 +519,33 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
515519
this.persist(topicConfig.getTopicName(), topicConfig);
516520
}
517521

522+
private synchronized void updateTieredStoreTopicMetadata(final TopicConfig topicConfig, Map<String, String> newAttributes) {
523+
if (!(brokerController.getMessageStore() instanceof TieredMessageStore)) {
524+
if (newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) != null) {
525+
throw new IllegalArgumentException("Update topic reserveTime not supported");
526+
}
527+
return;
528+
}
529+
530+
String topic = topicConfig.getTopicName();
531+
long reserveTime = TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getDefaultValue();
532+
String attr = topicConfig.getAttributes().get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName());
533+
if (attr != null) {
534+
reserveTime = Long.parseLong(attr);
535+
}
536+
537+
log.info("Update tiered storage metadata, topic {}, reserveTime {}", topic, reserveTime);
538+
TieredMessageStore tieredMessageStore = (TieredMessageStore) brokerController.getMessageStore();
539+
MetadataStore metadataStore = tieredMessageStore.getMetadataStore();
540+
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
541+
if (topicMetadata == null) {
542+
metadataStore.addTopic(topic, reserveTime);
543+
} else if (topicMetadata.getReserveTime() != reserveTime) {
544+
topicMetadata.setReserveTime(reserveTime);
545+
metadataStore.updateTopic(topicMetadata);
546+
}
547+
}
548+
518549
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
519550

520551
if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {

common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java

+9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import org.apache.rocketmq.common.attribute.Attribute;
2222
import org.apache.rocketmq.common.attribute.EnumAttribute;
23+
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
2324
import org.apache.rocketmq.common.attribute.TopicMessageType;
2425

2526
import static com.google.common.collect.Sets.newHashSet;
@@ -43,6 +44,13 @@ public class TopicAttributes {
4344
TopicMessageType.topicMessageTypeSet(),
4445
TopicMessageType.NORMAL.getValue()
4546
);
47+
public static final LongRangeAttribute TOPIC_RESERVE_TIME_ATTRIBUTE = new LongRangeAttribute(
48+
"reserve.time",
49+
true,
50+
-1,
51+
Long.MAX_VALUE,
52+
-1
53+
);
4654

4755
public static final Map<String, Attribute> ALL;
4856

@@ -51,5 +59,6 @@ public class TopicAttributes {
5159
ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
5260
ALL.put(CLEANUP_POLICY_ATTRIBUTE.getName(), CLEANUP_POLICY_ATTRIBUTE);
5361
ALL.put(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(), TOPIC_MESSAGE_TYPE_ATTRIBUTE);
62+
ALL.put(TOPIC_RESERVE_TIME_ATTRIBUTE.getName(), TOPIC_RESERVE_TIME_ATTRIBUTE);
5463
}
5564
}

tieredstore/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa
5757

5858
## How to contribute
5959

60-
We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
60+
We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
6161

62-
1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface.
62+
1. Extend [FileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java) and implement the methods of [FileSegmentProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentProvider.java) interface.
6363
2. Record metrics where appropriate. See `rocketmq_tiered_store_provider_rpc_latency`, `rocketmq_tiered_store_provider_upload_bytes`, and `rocketmq_tiered_store_provider_download_bytes`
6464
3. No need to maintain your own cache and avoid polluting the page cache. It is already having the read-ahead cache.

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ public boolean load() {
6060
this.flatFileConcurrentMap.clear();
6161
this.recover();
6262
this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
63-
long expiredTimeStamp = System.currentTimeMillis() -
64-
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
6563
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
64+
long expiredTimeStamp = System.currentTimeMillis() -
65+
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours());
6666
flatFile.destroyExpiredFile(expiredTimeStamp);
6767
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
6868
this.destroyFile(flatFile.getMessageQueue());

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java

+7
Original file line numberDiff line numberDiff line change
@@ -399,4 +399,11 @@ public void destroy() {
399399
fileLock.unlock();
400400
}
401401
}
402+
403+
public long getFileReservedHours() {
404+
if (topicMetadata.getReserveTime() > 0) {
405+
return topicMetadata.getReserveTime();
406+
}
407+
return storeConfig.getTieredStoreFileReservedTime();
408+
}
402409
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.concurrent.TimeUnit;
3536
import java.util.function.Supplier;
3637
import org.apache.rocketmq.common.Pair;
3738
import org.apache.rocketmq.common.message.MessageQueue;
@@ -180,7 +181,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
180181
MessageQueue mq = flatFile.getMessageQueue();
181182
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
182183
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
183-
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) {
184+
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
184185
continue;
185186
}
186187

@@ -209,7 +210,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
209210
MessageQueue mq = flatFile.getMessageQueue();
210211
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
211212
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
212-
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) {
213+
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
213214
continue;
214215
}
215216

0 commit comments

Comments
 (0)