Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]新增删除Group或GroupOffset功能 #1064

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.biz.group;

import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
@@ -39,5 +40,7 @@ PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Lon

Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;

Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception;

List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
@@ -17,6 +18,9 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
@@ -32,6 +36,7 @@
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
@@ -42,6 +47,7 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
@@ -58,14 +64,17 @@

@Component
public class GroupManagerImpl implements GroupManager {
private static final ILog log = LogFactory.getLog(GroupManagerImpl.class);
private static final ILog LOGGER = LogFactory.getLog(GroupManagerImpl.class);

@Autowired
private TopicService topicService;

@Autowired
private GroupService groupService;

@Autowired
private OpGroupService opGroupService;

@Autowired
private PartitionService partitionService;

@@ -246,6 +255,52 @@ public Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator)
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
}

@Override
public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterPhyId()));
}


// 按照group纬度进行删除
if (ValidateUtils.isBlank(dto.getGroupName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "groupName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP),
operator
);
}


// 按照topic纬度进行删除
if (ValidateUtils.isBlank(dto.getTopicName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
operator
);
}


// 按照partition纬度进行删除
if (ValidateUtils.isNullOrLessThanZero(dto.getPartitionId())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
operator
);
}

return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "deleteType类型错误");
}

@Override
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
// 获取指标
@@ -257,7 +312,7 @@ public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId,
);
if (metricsListResult.failed()) {
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
LOGGER.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
}
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.group;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

/**
* 删除offset
* @author zengqiao
* @date 19/4/8
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class GroupOffsetDeleteDTO extends BaseDTO {
@Min(value = 0, message = "clusterPhyId不允许为null或者小于0")
@ApiModelProperty(value = "集群ID", example = "6")
private Long clusterPhyId;

@NotBlank(message = "groupName不允许为空")
@ApiModelProperty(value = "消费组名称", example = "g-know-streaming")
private String groupName;

@ApiModelProperty(value = "Topic名称,按照Topic纬度进行删除时需要传", example = "know-streaming")
protected String topicName;

@ApiModelProperty(value = "分区ID,按照分区纬度进行删除时需要传")
private Integer partitionId;

/**
* @see com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum
*/
@NotNull(message = "deleteType不允许为空")
@ApiModelProperty(value = "删除类型", example = "0:group纬度,1:Topic纬度,2:Partition纬度")
private Integer deleteType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupParam extends GroupParam {
protected DeleteGroupTypeEnum deleteGroupTypeEnum;

public DeleteGroupParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum) {
super(clusterPhyId, groupName);
this.deleteGroupTypeEnum = deleteGroupTypeEnum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupTopicParam extends DeleteGroupParam {
protected String topicName;

public DeleteGroupTopicParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName) {
super(clusterPhyId, groupName, deleteGroupTypeEnum);
this.topicName = topicName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupTopicPartitionParam extends DeleteGroupTopicParam {
protected Integer partitionId;

public DeleteGroupTopicPartitionParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName, Integer partitionId) {
super(clusterPhyId, groupName, deleteGroupTypeEnum, topicName);
this.partitionId = partitionId;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class GroupParam extends ClusterPhyParam {
protected String groupName;

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.common.enums.group;

import lombok.Getter;


/**
* @author wyb
* @date 2022/10/11
*/
@Getter
public enum DeleteGroupTypeEnum {
UNKNOWN(-1, "Unknown"),

GROUP(0, "Group纬度"),

GROUP_TOPIC(1, "GroupTopic纬度"),

GROUP_TOPIC_PARTITION(2, "GroupTopicPartition纬度");

private final Integer code;

private final String msg;

DeleteGroupTypeEnum(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
}
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@ public enum VersionItemTypeEnum {

SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),

SERVICE_OP_GROUP(340, "service_group_operation"),

SERVICE_OP_CONNECT_CLUSTER(400, "service_connect_cluster_operation"),
SERVICE_OP_CONNECT_CONNECTOR(401, "service_connect_connector_operation"),
SERVICE_OP_CONNECT_PLUGIN(402, "service_connect_plugin_operation"),
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.xiaojukeji.know.streaming.km.core.service.group;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;

public interface OpGroupService {
/**
* 删除Topic
*/
Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package com.xiaojukeji.know.streaming.km.core.service.group.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupDAO;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;

import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_GROUP;

/**
* @author didi
*/
@Service
public class OpGroupServiceImpl extends BaseKafkaVersionControlService implements OpGroupService {
private static final ILog LOGGER = LogFactory.getLog(OpGroupServiceImpl.class);

private static final String DELETE_GROUP_OFFSET = "deleteGroupOffset";

@Autowired
private GroupDAO groupDAO;

@Autowired
private GroupMemberDAO groupMemberDAO;

@Autowired
private OpLogWrapService opLogWrapService;

@Autowired
private KafkaAdminClient kafkaAdminClient;

@Override
protected VersionItemTypeEnum getVersionItemType() {
return SERVICE_OP_GROUP;
}

@PostConstruct
private void init() {
registerVCHandler(DELETE_GROUP_OFFSET, V_1_1_0, V_MAX, "deleteGroupOffset", this::deleteGroupOffsetByClient);
}

@Override
public Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator) {
// 日志记录
LOGGER.info("method=deleteGroupOffset||param={}||operator={}||msg=delete group offset", ConvertUtil.obj2Json(param), operator);

try {
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_OFFSET, param);
if (rv == null || rv.failed()) {
return rv;
}

// 清理数据库中的数据
if (DeleteGroupTypeEnum.GROUP.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理Group数据
this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName());
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName());
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
DeleteGroupTopicParam topicParam = (DeleteGroupTopicParam) param;
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), topicParam.getTopicName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(topicParam))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理group + topic 数据
this.deleteGroupMemberInDB(topicParam.getClusterPhyId(), topicParam.getGroupName(), topicParam.getTopicName());
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
DeleteGroupTopicPartitionParam partitionParam = (DeleteGroupTopicPartitionParam) param;
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), partitionParam.getTopicName(), partitionParam.getPartitionId()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(partitionParam))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 不需要进行清理
}

return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

/**************************************************** private method ****************************************************/

private Result<Void> deleteGroupOffsetByClient(VersionItemParam itemParam) {
DeleteGroupParam deleteGroupParam = (DeleteGroupParam) itemParam;

if (DeleteGroupTypeEnum.GROUP.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupByClient(itemParam);
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupTopicOffsetByClient(itemParam);
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupTopicPartitionOffsetByClient(itemParam);
}

return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "删除Offset时,删除的类型参数非法");
}

private Result<Void> deleteGroupByClient(VersionItemParam itemParam) {
DeleteGroupParam param = (DeleteGroupParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());

DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(
Collections.singletonList(param.getGroupName()),
new DeleteConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);

deleteConsumerGroupsResult.all().get();
} catch (Exception e) {
LOGGER.error(
"method=deleteGroupByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!",
param.getClusterPhyId(), param.getGroupName(), e
);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private Result<Void> deleteGroupTopicOffsetByClient(VersionItemParam itemParam) {
DeleteGroupTopicParam param = (DeleteGroupTopicParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());

DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(
param.getTopicName()),
new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);

List<TopicPartition> tpList = describeTopicsResult
.all()
.get()
.get(param.getTopicName())
.partitions()
.stream()
.map(elem -> new TopicPartition(param.getTopicName(), elem.partition()))
.collect(Collectors.toList());

DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = adminClient.deleteConsumerGroupOffsets(
param.getGroupName(),
new HashSet<>(tpList),
new DeleteConsumerGroupOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);

deleteConsumerGroupOffsetsResult.all().get();
} catch (Exception e) {
LOGGER.error(
"method=deleteGroupTopicOffsetByClient||clusterPhyId={}||groupName={}||topicName={}||errMsg=delete group failed||msg=exception!",
param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), e
);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private Result<Void> deleteGroupTopicPartitionOffsetByClient(VersionItemParam itemParam) {
DeleteGroupTopicPartitionParam param = (DeleteGroupTopicPartitionParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());

DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = adminClient.deleteConsumerGroupOffsets(
param.getGroupName(),
new HashSet<>(Arrays.asList(new TopicPartition(param.getTopicName(), param.getPartitionId()))),
new DeleteConsumerGroupOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);

deleteConsumerGroupOffsetsResult.all().get();
} catch (Exception e) {
LOGGER.error(
"method=deleteGroupTopicPartitionOffsetByClient||clusterPhyId={}||groupName={}||topicName={}||partitionId={}||errMsg=delete group failed||msg=exception!",
param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), param.getPartitionId(), e
);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private int deleteGroupInDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupPO::getName, groupName);

return groupDAO.delete(lambdaQueryWrapper);
}

private int deleteGroupMemberInDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupMemberPO::getGroupName, groupName);

return groupMemberDAO.delete(lambdaQueryWrapper);
}

private int deleteGroupMemberInDB(Long clusterPhyId, String groupName, String topicName) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupMemberPO::getGroupName, groupName);
lambdaQueryWrapper.eq(GroupMemberPO::getTopicName, topicName);

return groupMemberDAO.delete(lambdaQueryWrapper);
}
}
Original file line number Diff line number Diff line change
@@ -38,6 +38,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {

private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";

private static final String FE_DELETE_GROUP_OFFSET = "FEDeleteGroupOffset";

public FrontEndControlVersionItems(){}

@Override
@@ -91,10 +93,13 @@ public List<VersionControlItem> init(){
itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX)
.name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制"));

//truncate topic
// truncate topic
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
.name(FE_TRUNCATE_TOPIC).desc("清空topic"));
.name(FE_TRUNCATE_TOPIC).desc("清空Topic"));

// truncate topic
itemList.add(buildItem().minVersion(VersionEnum.V_1_1_0).maxVersion(VersionEnum.V_MAX)
.name(FE_DELETE_GROUP_OFFSET).desc("删除GroupOffset"));
return itemList;
}
}
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

import com.didiglobal.logi.security.util.HttpRequestUtil;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupTopicConsumedDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -32,13 +33,20 @@ public class GroupController {
@Autowired
private GroupService groupService;

@ApiOperation(value = "重置组消费偏移", notes = "")
@ApiOperation(value = "重置消费偏移", notes = "")
@PutMapping(value = "group-offsets")
@ResponseBody
public Result<Void> resetGroupOffsets(@Validated @RequestBody GroupOffsetResetDTO dto) throws Exception {
return groupManager.resetGroupOffsets(dto, HttpRequestUtil.getOperator());
}

@ApiOperation(value = "删除消费偏移", notes = "")
@DeleteMapping(value = "group-offsets")
@ResponseBody
public Result<Void> deleteGroupOffsets(@Validated @RequestBody GroupOffsetDeleteDTO dto) throws Exception {
return groupManager.deleteGroupOffsets(dto, HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Group-Topic指标信息", notes = "")
@PostMapping(value = "clusters/{clusterId}/topics/{topicName}/groups/{groupName}/metric")
@ResponseBody