Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7642] Add return value for sendHeartbeat related method #7643

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,9 @@ public synchronized void start() throws MQClientException {

this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
this.mQClientFactory.rebalanceImmediately();
}
}

private void checkConfig() throws MQClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,14 @@ public void onChannelIdle(String remoteAddr, Channel channel) {
@Override
public void onChannelActive(String remoteAddr, Channel channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
sendHeartbeatToAllBrokerWithLockV2(false);
for (Map.Entry<Long, String> entry : addressEntry.getValue().entrySet()) {
String addr = entry.getValue();
if (addr.equals(remoteAddr)) {
long id = entry.getKey();
String brokerName = addressEntry.getKey();
if (sendHeartbeatToBroker(id, brokerName, addr)) {
rebalanceImmediately();
}
break;
}
}
Expand Down Expand Up @@ -504,13 +509,13 @@ public void checkClientInBroker() throws MQClientException {
}
}

public void sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
public boolean sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
if (this.lockHeartbeat.tryLock()) {
try {
if (clientConfig.isUseHeartbeatV2()) {
this.sendHeartbeatToAllBrokerV2(isRebalance);
return this.sendHeartbeatToAllBrokerV2(isRebalance);
} else {
this.sendHeartbeatToAllBroker();
return this.sendHeartbeatToAllBroker();
}
} catch (final Exception e) {
log.error("sendHeartbeatToAllBrokerWithLockV2 exception", e);
Expand All @@ -520,15 +525,16 @@ public void sendHeartbeatToAllBrokerWithLockV2(boolean isRebalance) {
} else {
log.warn("sendHeartbeatToAllBrokerWithLockV2 lock heartBeat, but failed.");
}
return false;
}

public void sendHeartbeatToAllBrokerWithLock() {
public boolean sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
if (clientConfig.isUseHeartbeatV2()) {
this.sendHeartbeatToAllBrokerV2(false);
return this.sendHeartbeatToAllBrokerV2(false);
} else {
this.sendHeartbeatToAllBroker();
return this.sendHeartbeatToAllBroker();
}
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
Expand All @@ -538,6 +544,7 @@ public void sendHeartbeatToAllBrokerWithLock() {
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
return false;
}

private void persistAllConsumerOffset() {
Expand Down Expand Up @@ -582,19 +589,72 @@ private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
return false;
}

private void sendHeartbeatToAllBroker() {
public boolean sendHeartbeatToBroker(long id, String brokerName, String addr) {
if (this.lockHeartbeat.tryLock()) {
final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sendHeartbeatToBroker sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return false;
}
try {
if (clientConfig.isUseHeartbeatV2()) {
int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
heartbeatDataWithoutSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
return this.sendHeartbeatToBrokerV2(id, brokerName, addr, heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
} else {
return this.sendHeartbeatToBroker(id, brokerName, addr, heartbeatDataWithSub);
}
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
return false;
}

private boolean sendHeartbeatToBroker(long id, String brokerName, String addr, HeartbeatData heartbeatData) {
try {
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
return true;
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
return false;
}

private boolean sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData(false);
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
return false;
}

if (this.brokerAddrTable.isEmpty()) {
return;
return false;
}
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
String brokerName = brokerClusterInfo.getKey();
HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
Expand All @@ -611,43 +671,71 @@ private void sendHeartbeatToAllBroker() {
continue;
}

try {
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
sendHeartbeatToBroker(id, brokerName, addr, heartbeatData);
}
}
return true;
}

private boolean sendHeartbeatToBrokerV2(long id, String brokerName, String addr, HeartbeatData heartbeatDataWithSub,
HeartbeatData heartbeatDataWithoutSub, int currentHeartbeatFingerprint) {
try {
int version = 0;
boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
HeartbeatV2Result heartbeatV2Result = null;
if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout());
if (heartbeatV2Result.isSubChange()) {
brokerAddrHeartbeatFingerprintTable.remove(addr);
}
log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
} else {
heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, clientConfig.getMqClientApiTimeout());
if (heartbeatV2Result.isSupportV2()) {
brokerSupportV2HeartbeatSet.add(addr);
if (heartbeatV2Result.isSubChange()) {
brokerAddrHeartbeatFingerprintTable.remove(addr);
} else if (!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
brokerAddrHeartbeatFingerprintTable.put(addr, currentHeartbeatFingerprint);
}
}
log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
}
version = heartbeatV2Result.getVersion();
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatDataWithSub.toString());
}
return true;
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e);
}
}
return false;
}

private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
private boolean sendHeartbeatToAllBrokerV2(boolean isRebalance) {
final HeartbeatData heartbeatDataWithSub = this.prepareHeartbeatData(false);
final boolean producerEmpty = heartbeatDataWithSub.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatDataWithSub.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
return false;
}
if (this.brokerAddrTable.isEmpty()) {
return;
return false;
}
if (isRebalance) {
resetBrokerAddrHeartbeatFingerprintMap();
}
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
int currentHeartbeatFingerprint = heartbeatDataWithSub.computeHeartbeatFingerprint();
heartbeatDataWithSub.setHeartbeatFingerprint(currentHeartbeatFingerprint);
HeartbeatData heartbeatDataWithoutSub = this.prepareHeartbeatData(true);
Expand All @@ -668,46 +756,10 @@ private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
if (consumerEmpty && MixAll.MASTER_ID != id) {
continue;
}
try {
int version = 0;
boolean isBrokerSupportV2 = brokerSupportV2HeartbeatSet.contains(addr);
HeartbeatV2Result heartbeatV2Result = null;
if (isBrokerSupportV2 && null != brokerAddrHeartbeatFingerprintTable.get(addr) && brokerAddrHeartbeatFingerprintTable.get(addr) == currentHeartbeatFingerprint) {
heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithoutSub, clientConfig.getMqClientApiTimeout());
if (heartbeatV2Result.isSubChange()) {
brokerAddrHeartbeatFingerprintTable.remove(addr);
}
log.info("sendHeartbeatToAllBrokerV2 simple brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
} else {
heartbeatV2Result = this.mQClientAPIImpl.sendHeartbeatV2(addr, heartbeatDataWithSub, clientConfig.getMqClientApiTimeout());
if (heartbeatV2Result.isSupportV2()) {
brokerSupportV2HeartbeatSet.add(addr);
if (heartbeatV2Result.isSubChange()) {
brokerAddrHeartbeatFingerprintTable.remove(addr);
} else if (!brokerAddrHeartbeatFingerprintTable.containsKey(addr) || brokerAddrHeartbeatFingerprintTable.get(addr) != currentHeartbeatFingerprint) {
brokerAddrHeartbeatFingerprintTable.put(addr, currentHeartbeatFingerprint);
}
}
log.info("sendHeartbeatToAllBrokerV2 normal brokerName: {} subChange: {} brokerAddrHeartbeatFingerprintTable: {}", brokerName, heartbeatV2Result.isSubChange(), JSON.toJSONString(brokerAddrHeartbeatFingerprintTable));
}
version = heartbeatV2Result.getVersion();
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatDataWithSub.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("sendHeartbeatToAllBrokerV2 send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e);
}
}
sendHeartbeatToBrokerV2(id, brokerName, addr, heartbeatDataWithSub, heartbeatDataWithoutSub, currentHeartbeatFingerprint);
}
}
return true;
}

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
Expand Down