Skip to content

Commit

Permalink
Let subscriber and publisher recover from serdes error
Browse files Browse the repository at this point in the history
  • Loading branch information
Superioz committed Mar 1, 2021
1 parent 3352cb9 commit ee6e101
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,18 @@ public int execute() {
final Packet.Payload packetPayload = packet.getPayload();
final byte[] payloadData = packetPayload.getValue().toByteArray();

final Object payload = payloadData.length == 0 ? null :
registeredTopic.getSerdes().deserialize(
packetPayload.getTypeUrl(),
packetPayload.getValue().toByteArray()
);
Object payload = null;
try {
payload = payloadData.length == 0 ? null :
registeredTopic.getSerdes().deserialize(
packetPayload.getTypeUrl(),
packetPayload.getValue().toByteArray()
);
} catch (final Exception ex) {
// if an error occurs during the serdes
this.service.getLogger().warn("Error during deserialization", ex);
return 0;
}
final Status status = new Status(packet.getStatus().getCode(),
packet.getStatus().getSubcode(), packet.getStatus().getMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,19 @@ public <T> void send(final String topic, final String key, final HagridPacket<T>
}

final T payload = packet.getPayloadOrNull();
final Packet.Payload packetPayload = payload == null
? Packet.Payload.newBuilder().setValue(ByteString.copyFrom(new byte[] {})).build()
: Packet.Payload.newBuilder()
.setTypeUrl(payload.getClass().getTypeName())
.setValue(ByteString.copyFrom(registeredTopic.getSerdes().serialize(payload)))
.build();
Packet.Payload packetPayload = null;
try {
packetPayload = payload == null
? Packet.Payload.newBuilder().setValue(ByteString.copyFrom(new byte[] {})).build()
: Packet.Payload.newBuilder()
.setTypeUrl(payload.getClass().getTypeName())
.setValue(ByteString.copyFrom(registeredTopic.getSerdes().serialize(payload)))
.build();
} catch (final Exception ex) {
// exception during serdes
this.service.getLogger().warn("Error during serialization", ex);
return;
}

final Packet protoPacket = Packet.newBuilder()
.setPayload(packetPayload)
Expand Down

0 comments on commit ee6e101

Please sign in to comment.