Skip to content

Commit 7f71fb1

Browse files
committed
Made unrecoverable evaluation errors immediately terminate a message rather than force re-send
1 parent 5291127 commit 7f71fb1

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

internal/hops/runner.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ func (r *Runner) SequenceCallback(
9292
ons, d := automations.EventOns(msgBundle)
9393
if d.HasErrors() {
9494
r.logDiagnostics(d, logger)
95-
return false, fmt.Errorf("Error evaluating automations: %s", d.Error())
95+
return false, fmt.Errorf("%w: %s", nats.ErrEventFatal, d.Error())
9696
}
9797

9898
if len(ons) == 0 {
9999
return false, nil
100100
}
101101

102-
logger.Debug().Msg("Successfully evaluated event's automations")
102+
logger.Debug().Msg("Successfully evaluated automations")
103103

104104
var mergedErrors error
105105
// NOTE: We could potentially get a speed boost by dispatching/handling each

nats/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
)
3535

3636
var (
37+
ErrEventFatal = errors.New("This event cannot be processed")
3738
ErrIncompleteMsgBundle = errors.New("Unable to fetch complete sequence history")
3839
nameReplacer = strings.NewReplacer("*", "all", ".", "dot", ">", "children")
3940
)
@@ -192,6 +193,10 @@ func (c *Client) ConsumeSequences(ctx context.Context, fromConsumer string, hand
192193
}
193194

194195
handled, err := handler.SequenceCallback(ctx, hopsMsg.SequenceId, msgBundle)
196+
if errors.Is(err, ErrEventFatal) {
197+
msg.Term()
198+
return
199+
}
195200
if err != nil {
196201
msg.NakWithDelay(3 * time.Second)
197202
return

0 commit comments

Comments
 (0)