Skip to content

Commit 163451e

Browse files
authored
Support for Persisting LMQ Consumer Offsets in Config V1 Using RocksDB (#8939)
1 parent 4e8a5ca commit 163451e

File tree

3 files changed

+19
-142
lines changed

3 files changed

+19
-142
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
7878
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
7979
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
80-
import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
8180
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
8281
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
8382
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
@@ -352,7 +351,7 @@ public BrokerController(
352351
} else if (this.messageStoreConfig.isEnableRocksDBStore()) {
353352
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
354353
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
355-
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
354+
this.consumerOffsetManager = new RocksDBConsumerOffsetManager(this);
356355
} else {
357356
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
358357
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java

-103
This file was deleted.

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java

+18-37
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.rocketmq.broker.offset;
1919

2020
import org.apache.rocketmq.broker.BrokerController;
21-
import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
21+
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
2222
import org.apache.rocketmq.common.BrokerConfig;
2323
import org.apache.rocketmq.common.MixAll;
2424
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -28,56 +28,48 @@
2828

2929
import java.util.HashMap;
3030
import java.util.Map;
31-
import java.util.concurrent.ConcurrentHashMap;
3231

3332
import static org.junit.Assert.assertEquals;
3433
import static org.junit.Assert.assertNotNull;
35-
import static org.junit.Assert.assertTrue;
34+
import static org.junit.Assert.assertNull;
3635
import static org.mockito.Mockito.when;
3736

3837
public class RocksDBLmqConsumerOffsetManagerTest {
3938
private static final String LMQ_GROUP = MixAll.LMQ_PREFIX + "FooBarGroup";
4039
private static final String NON_LMQ_GROUP = "nonLmqGroup";
41-
private static final String TOPIC = "FooBarTopic";
40+
41+
private static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "FooBarTopic";
42+
private static final String NON_LMQ_TOPIC = "FooBarTopic";
4243
private static final int QUEUE_ID = 0;
4344
private static final long OFFSET = 12345;
4445

4546
private BrokerController brokerController;
4647

47-
private RocksDBLmqConsumerOffsetManager offsetManager;
48+
private RocksDBConsumerOffsetManager offsetManager;
4849

4950
@Before
5051
public void setUp() {
5152
brokerController = Mockito.mock(BrokerController.class);
5253
when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
53-
when(brokerController.getBrokerConfig()).thenReturn(Mockito.mock(BrokerConfig.class));
54-
offsetManager = new RocksDBLmqConsumerOffsetManager(brokerController);
54+
when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
55+
offsetManager = new RocksDBConsumerOffsetManager(brokerController);
5556
}
5657

57-
@Test
58-
public void testQueryOffsetForLmq() {
59-
// Setup
60-
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
61-
// Execute
62-
long actualOffset = offsetManager.queryOffset(LMQ_GROUP, TOPIC, QUEUE_ID);
63-
// Verify
64-
assertEquals("Offset should match the expected value.", OFFSET, actualOffset);
65-
}
6658

6759
@Test
6860
public void testQueryOffsetForNonLmq() {
69-
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC, QUEUE_ID);
61+
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID);
7062
// Verify
7163
assertEquals("Offset should not be null.", -1, actualOffset);
7264
}
7365

7466

7567
@Test
7668
public void testQueryOffsetForLmqGroupWithExistingOffset() {
77-
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
69+
offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
7870

7971
// Act
80-
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, TOPIC);
72+
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, LMQ_TOPIC);
8173

8274
// Assert
8375
assertNotNull(actualOffsets);
@@ -89,23 +81,20 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() {
8981
public void testQueryOffsetForLmqGroupWithoutExistingOffset() {
9082
// Act
9183
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic");
92-
9384
// Assert
94-
assertNotNull(actualOffsets);
95-
assertTrue("The map should be empty for non-existing offsets", actualOffsets.isEmpty());
85+
assertNull(actualOffsets);
9686
}
9787

9888
@Test
9989
public void testQueryOffsetForNonLmqGroup() {
100-
when(brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep()).thenReturn(1L);
10190
// Arrange
10291
Map<Integer, Long> mockOffsets = new HashMap<>();
10392
mockOffsets.put(QUEUE_ID, OFFSET);
10493

105-
offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET);
94+
offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID, OFFSET);
10695

10796
// Act
108-
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC);
97+
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC);
10998

11099
// Assert
111100
assertNotNull(actualOffsets);
@@ -115,21 +104,13 @@ public void testQueryOffsetForNonLmqGroup() {
115104
@Test
116105
public void testCommitOffsetForLmq() {
117106
// Execute
118-
offsetManager.commitOffset("clientHost", LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET);
107+
offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
119108
// Verify
120-
Long expectedOffset = offsetManager.getLmqOffsetTable().get(getKey());
109+
Long expectedOffset = offsetManager.getOffsetTable().get(getLMQKey()).get(QUEUE_ID);
121110
assertEquals("Offset should be updated correctly.", OFFSET, expectedOffset.longValue());
122111
}
123112

124-
@Test
125-
public void testEncode() {
126-
offsetManager.setLmqOffsetTable(new ConcurrentHashMap<>(512));
127-
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
128-
String encodedData = offsetManager.encode();
129-
assertTrue(encodedData.contains(String.valueOf(OFFSET)));
130-
}
131-
132-
private String getKey() {
133-
return TOPIC + "@" + LMQ_GROUP;
113+
private String getLMQKey() {
114+
return LMQ_TOPIC + "@" + LMQ_GROUP;
134115
}
135116
}

0 commit comments

Comments
 (0)