Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7064] [RIP-66-1] Support KV(RocksDB) Storage for Metadata #7092

Merged
merged 33 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
09c0c8c
typo int readme[ecosystem]
fujian-zfj Nov 30, 2021
89713ba
Merge branch 'apache:develop' into develop
fujian-zfj Sep 8, 2022
8d0c4fc
Merge branch 'apache:develop' into develop
fujian-zfj Sep 13, 2022
2c9e961
Merge branch 'apache:develop' into develop
fujian-zfj Jan 10, 2023
93d6669
Merge branch 'apache:develop' into develop
fujian-zfj Feb 2, 2023
2dd16d6
Merge branch 'apache:develop' into develop
fujian-zfj Feb 14, 2023
e02c7aa
Merge branch 'apache:develop' into develop
fujian-zfj Mar 2, 2023
cb8c7b0
Merge branch 'apache:develop' into develop
fujian-zfj Mar 10, 2023
03f707f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 10, 2023
6ef608f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 14, 2023
818919f
Merge branch 'apache:develop' into develop
fujian-zfj Mar 21, 2023
b315381
Merge branch 'apache:develop' into develop
fujian-zfj Mar 21, 2023
5d1b050
Merge branch 'apache:develop' into develop
fujian-zfj Mar 30, 2023
de2238b
Merge branch 'apache:develop' into develop
fujian-zfj Apr 3, 2023
d22756f
Merge branch 'apache:develop' into develop
fujian-zfj Apr 25, 2023
83d8178
Merge branch 'apache:develop' into develop
fujian-zfj May 9, 2023
a4c94a9
Merge branch 'apache:develop' into develop
fujian-zfj May 18, 2023
7234a41
Merge branch 'apache:develop' into develop
fujian-zfj Jun 3, 2023
c7708a7
Merge branch 'apache:develop' into develop
fujian-zfj Jul 10, 2023
166ebbd
Merge branch 'apache:develop' into develop
fujian-zfj Jul 11, 2023
6b27cc7
Merge branch 'apache:develop' into develop
fujian-zfj Jul 23, 2023
3b0a27b
Merge branch 'apache:develop' into develop
fujian-zfj Jul 28, 2023
80e259e
rocksdb metadata
fujian-zfj Jul 28, 2023
11a0c32
add unit test
fujian-zfj Jul 29, 2023
18b909b
fix testOffsetPersistInMemory
fujian-zfj Jul 29, 2023
2fa8485
fix unit test
fujian-zfj Jul 29, 2023
2de0cca
fix unit test
fujian-zfj Jul 30, 2023
0cc5cff
remove unused import
fujian-zfj Jul 30, 2023
03d9988
move RocksDBOffsetSerialize to broker moudle
fujian-zfj Jul 31, 2023
1a99cc8
Fix bazel build scripts
lizhanhui Jul 31, 2023
e0aae35
Flag QueryMsgByKeyIT as flaky as it fails at frequency: 5 out of 32
lizhanhui Jul 31, 2023
7e4a409
change public to private of some inner method
fujian-zfj Aug 2, 2023
26eec8f
Merge branch 'develop_rocksdb_metadata' of github.com:fujian-zfj/rock…
fujian-zfj Aug 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
],
fetch_sources = True,
repositories = [
Expand Down
3 changes: 3 additions & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)

Expand Down Expand Up @@ -81,6 +83,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:org_powermock_powermock_core",
"@maven//:io_opentelemetry_opentelemetry_api",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
Expand All @@ -66,8 +68,12 @@
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
Expand Down Expand Up @@ -120,6 +126,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand Down Expand Up @@ -301,9 +308,16 @@ public BrokerController(
this.messageStoreConfig = messageStoreConfig;
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
if (isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
} else {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
}
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.peekMessageProcessor = new PeekMessageProcessor(this);
Expand All @@ -324,7 +338,6 @@ public BrokerController(
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.scheduleMessageService = new ScheduleMessageService(this);
this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this);
this.coldDataCgCtrService = new ColdDataCgCtrService(this);
Expand Down Expand Up @@ -1383,8 +1396,6 @@ protected void shutdownBasicService() {
this.adminBrokerExecutor.shutdown();
}

this.consumerOffsetManager.persist();

if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
Expand Down Expand Up @@ -1449,8 +1460,20 @@ protected void shutdownBasicService() {
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);

this.topicConfigManager.persist();
this.subscriptionGroupManager.persist();
if (this.topicConfigManager != null) {
this.topicConfigManager.persist();
this.topicConfigManager.stop();
}

if (this.subscriptionGroupManager != null) {
this.subscriptionGroupManager.persist();
this.subscriptionGroupManager.stop();
}

if (this.consumerOffsetManager != null) {
this.consumerOffsetManager.persist();
this.consumerOffsetManager.stop();
}

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
Expand Down Expand Up @@ -2375,4 +2398,8 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -26,6 +25,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Strings;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
Expand All @@ -37,12 +39,12 @@
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class ConsumerOffsetManager extends ConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";

private DataVersion dataVersion = new DataVersion();

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);

private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
Expand All @@ -62,6 +64,10 @@ public ConsumerOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
}

protected void removeConsumerOffset(String topicAtGroup) {

}
Comment on lines +67 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When was the method called?

Copy link
Contributor Author

@fujian-zfj fujian-zfj Jul 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is mainly to let RocksDBConsumeOffsetManager to override and delete the data in rocksdb, you can see the overridden method in RocksDBConsumeOffsetManager.


public void cleanOffset(String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand All @@ -71,6 +77,7 @@ public void cleanOffset(String group) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue());
}
}
Expand All @@ -86,6 +93,7 @@ public void cleanOffsetByTopic(String topic) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && topic.equals(arrays[0])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue());
}
}
Expand All @@ -105,6 +113,7 @@ public void scanUnsubscribedTopic() {
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
&& this.offsetBehindMuchThanData(topic, next.getValue())) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("remove topic offset, {}", topicAtGroup);
}
}
Expand Down Expand Up @@ -313,8 +322,10 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
for (String group : filterGroups.split(",")) {
Iterator<String> it = topicGroups.iterator();
while (it.hasNext()) {
if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
String topicAtGroup = it.next();
if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
}
}
}
Expand Down Expand Up @@ -371,6 +382,7 @@ public void removeOffset(final String group) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("clean group offset {}", topicAtGroup);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
}

@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}

@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

@Override
public synchronized void persist() {
WriteBatch writeBatch = new WriteBatch();
try {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next();
putWriteBatch(writeBatch, entry.getKey(), entry.getValue());

if (writeBatch.getDataSize() >= 4 * 1024) {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
}
}
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
LOG.error("consumer offset persist Failed", e);
} finally {
writeBatch.close();
}
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}
}
Loading