Skip to content

Commit dddd1b4

Browse files
author
duanxiaoqiu
committed
[Feature]增加Truncate数据功能(didi#1043)
1 parent 7ed2eab commit dddd1b4

File tree

6 files changed

+49
-8
lines changed

6 files changed

+49
-8
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
1111
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
1212
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
13+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
1314
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
1415
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1516
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
1617
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
18+
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
1719
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
1820
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
1921
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
@@ -159,7 +161,7 @@ public Result<Void> expandTopic(TopicExpansionDTO dto, String operator) {
159161
@Override
160162
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
161163
// 清空Topic
162-
Result<Void> rv = opTopicService.truncateTopic(new TopicParam(clusterPhyId, topicName), operator);
164+
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
163165
if (rv.failed()) {
164166
return rv;
165167
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
@Data
9+
@NoArgsConstructor
10+
@AllArgsConstructor
11+
public class TopicTruncateParam extends ClusterPhyParam {
12+
protected String topicName;
13+
protected long offset;
14+
15+
public TopicTruncateParam(Long clusterPhyId, String topicName, long offset) {
16+
super(clusterPhyId);
17+
this.topicName = topicName;
18+
this.offset = offset;
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return "TopicParam{" +
24+
"clusterPhyId=" + clusterPhyId +
25+
", topicName='" + topicName + '\'' +
26+
", offset='" + offset + '\'' +
27+
'}';
28+
}
29+
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class KafkaConstant {
4949

5050
public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();
5151

52+
public static final Integer TOPICK_TRUNCATE_DEFAULT_OFFSET = -1;
53+
5254
static {
5355
try {
5456
KAFKA_ALL_CONFIG_DEF_MAP.putAll(CollectionConverters.asJava(LogConfig$.MODULE$.configKeys()));

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/OpTopicService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
44
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
55
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
6+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
67
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
78
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
89

@@ -25,5 +26,5 @@ public interface OpTopicService {
2526
/**
2627
* 清空topic消息
2728
*/
28-
Result<Void> truncateTopic(TopicParam param, String operator);
29+
Result<Void> truncateTopic(TopicTruncateParam param, String operator);
2930
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
1010
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
11+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
1112
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1213
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
1314
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
@@ -208,7 +209,7 @@ public Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String op
208209
}
209210

210211
@Override
211-
public Result<Void> truncateTopic(TopicParam param, String operator) {
212+
public Result<Void> truncateTopic(TopicTruncateParam param, String operator) {
212213
try {
213214
// 清空topic数据
214215
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param);
@@ -233,26 +234,26 @@ public Result<Void> truncateTopic(TopicParam param, String operator) {
233234
/**************************************************** private method ****************************************************/
234235

235236
private Result<Void> truncateTopicByKafkaClient(VersionItemParam itemParam) {
236-
TopicParam param = (TopicParam) itemParam;
237+
TopicTruncateParam param = (TopicTruncateParam) itemParam;
237238
try {
238239
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
239240
//获取topic的分区信息
240-
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()));
241+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()), new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
241242
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
242243

243244
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
244-
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(-1);
245+
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(param.getOffset());
245246

246247
descriptionMap.forEach((topicName, topicDescription) -> {
247248
for (TopicPartitionInfo topicPartition : topicDescription.partitions()) {
248249
recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset);
249250
}
250251
});
251252

252-
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
253+
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete, new DeleteRecordsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
253254
deleteRecordsResult.all().get();
254255
} catch (Exception e) {
255-
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{}", param.getClusterPhyId(), param.getTopicName(), e);
256+
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{} offset:{}", param.getClusterPhyId(), param.getTopicName(), param.getOffset(), e);
256257

257258
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
258259
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java

+6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
3636
private static final String FE_HA_CREATE_MIRROR_TOPIC = "FEHaCreateMirrorTopic";
3737
private static final String FE_HA_DELETE_MIRROR_TOPIC = "FEHaDeleteMirrorTopic";
3838

39+
private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";
40+
3941
public FrontEndControlVersionItems(){}
4042

4143
@Override
@@ -89,6 +91,10 @@ public List<VersionControlItem> init(){
8991
itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX)
9092
.name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制"));
9193

94+
//truncate topic
95+
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
96+
.name(FE_TRUNCATE_TOPIC).desc("清空topic"));
97+
9298
return itemList;
9399
}
94100
}

0 commit comments

Comments
 (0)