Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6933] Optimize delete topic in tiered storage #6973

Merged
merged 2 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[ISSUE #6933] Optimize delete topic in tiered storage
  • Loading branch information
lizhimins committed Jun 30, 2023
commit 7c4bb10561e3d8e1bb2766ab5190e361577d6adc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
Expand Down Expand Up @@ -394,12 +395,7 @@ public int cleanUnusedTopic(Set<String> retainTopics) {
MixAll.isLmq(topic)) {
return;
}
logger.info("TieredMessageStore#cleanUnusedTopic: start deleting topic {}", topic);
try {
destroyCompositeFlatFile(topicMetadata);
} catch (Exception e) {
logger.error("TieredMessageStore#cleanUnusedTopic: delete topic {} failed", topic, e);
}
this.destroyCompositeFlatFile(topicMetadata.getTopic());
});
} catch (Exception e) {
logger.error("TieredMessageStore#cleanUnusedTopic: iterate topic metadata failed", e);
Expand All @@ -410,38 +406,24 @@ public int cleanUnusedTopic(Set<String> retainTopics) {
@Override
public int deleteTopics(Set<String> deleteTopics) {
for (String topic : deleteTopics) {
logger.info("TieredMessageStore#deleteTopics: start deleting topic {}", topic);
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata != null) {
destroyCompositeFlatFile(topicMetadata);
} else {
logger.error("TieredMessageStore#deleteTopics: delete topic {} failed, can not obtain metadata", topic);
}
} catch (Exception e) {
logger.error("TieredMessageStore#deleteTopics: delete topic {} failed", topic, e);
}
this.destroyCompositeFlatFile(topic);
}

return next.deleteTopics(deleteTopics);
}

public void destroyCompositeFlatFile(TopicMetadata topicMetadata) {
String topic = topicMetadata.getTopic();
metadataStore.iterateQueue(topic, queueMetadata -> {
MessageQueue mq = queueMetadata.getQueue();
CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
if (flatFile != null) {
flatFileManager.destroyCompositeFile(mq);
try {
metadataStore.deleteQueue(mq);
} catch (Exception e) {
throw new IllegalStateException(e);
}
logger.info("TieredMessageStore#destroyCompositeFlatFile: " +
"destroy flatFile success: topic: {}, queueId: {}", mq.getTopic(), mq.getQueueId());
public void destroyCompositeFlatFile(String topic) {
try {
if (StringUtils.isBlank(topic)) {
return;
}
});
metadataStore.deleteTopic(topicMetadata.getTopic());
metadataStore.iterateQueue(topic, queueMetadata -> {
flatFileManager.destroyCompositeFile(queueMetadata.getQueue());
});
// delete topic metadata
metadataStore.deleteTopic(topic);
logger.info("Destroy composite flat file in message store, topic={}", topic);
} catch (Exception e) {
logger.error("Destroy composite flat file in message store failed, topic={}", topic, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,19 @@ public void destroy() {
}

public void destroyCompositeFile(MessageQueue mq) {
if (mq == null) {
return;
}

// delete memory reference
CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
if (flatFile != null) {
MessageQueue messageQueue = flatFile.getMessageQueue();
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
"try to destroy composite flat file: topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId());

// delete queue metadata
flatFile.destroy();
}
}
Expand Down