Skip to content

Commit 94d0408

Browse files
authored
fix(pubsub): check response of receipt modacks for exactly once delivery (#7568)
* fix(pubsub): check response of receipt modacks for exactly once delivery * document behavior of pendingMessages * document behavior of pendingMessages * remove defer for unlocking
1 parent 3c27930 commit 94d0408

File tree

3 files changed

+72
-53
lines changed

3 files changed

+72
-53
lines changed

pubsub/integration_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -2020,11 +2020,10 @@ func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
20202020

20212021
func TestIntegration_TopicUpdateSchema(t *testing.T) {
20222022
ctx := context.Background()
2023-
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
2024-
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
2023+
c := integrationTestClient(ctx, t)
20252024
defer c.Close()
20262025

2027-
sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
2026+
sc := integrationTestSchemaClient(ctx, t)
20282027
defer sc.Close()
20292028

20302029
schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")

pubsub/iterator.go

+63-23
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
255255
// do a receipt mod-ack when streaming.
256256
maxExt := time.Now().Add(it.po.maxExtension)
257257
ackIDs := map[string]*AckResult{}
258+
it.eoMu.RLock()
259+
exactlyOnceDelivery := it.enableExactlyOnceDelivery
260+
it.eoMu.RUnlock()
258261
it.mu.Lock()
262+
263+
// pendingMessages maps ackID -> message, and is used
264+
// only when exactly once delivery is enabled.
265+
// At first, all messages are pending, and they
266+
// are removed if the modack call fails. All other
267+
// messages are returned to the client for processing.
268+
pendingMessages := make(map[string]*ipubsub.Message)
259269
for _, m := range msgs {
260270
ackID := msgAckID(m)
261271
addRecv(m.ID, ackID, now)
@@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
264274
// possible if there are retries.
265275
if _, ok := it.pendingNacks[ackID]; !ok {
266276
// Don't use the message's AckResult here since these are only for receipt modacks.
267-
// ModAckResults are transparent to the user anyway so these can automatically succeed.
277+
// modack results are transparent to the user so these can automatically succeed unless
278+
// exactly once is enabled.
268279
// We can't use an empty AckResult here either since SetAckResult will try to
269280
// close the channel without checking if it exists.
270-
ackIDs[ackID] = newSuccessAckResult()
281+
if !exactlyOnceDelivery {
282+
ackIDs[ackID] = newSuccessAckResult()
283+
} else {
284+
ackIDs[ackID] = ipubsub.NewAckResult()
285+
pendingMessages[ackID] = m
286+
}
271287
}
272288
}
273289
deadline := it.ackDeadline()
274290
it.mu.Unlock()
275-
go func() {
276-
if len(ackIDs) > 0 {
277-
// Don't check the return value of this since modacks are fire and forget,
278-
// meaning errors should not be propagated to the client.
279-
it.sendModAck(ackIDs, deadline)
291+
292+
if len(ackIDs) > 0 {
293+
// When exactly once delivery is not enabled, modacks are fire and forget.
294+
if !exactlyOnceDelivery {
295+
go func() {
296+
it.sendModAck(ackIDs, deadline, false)
297+
}()
298+
return msgs, nil
280299
}
281-
}()
282-
return msgs, nil
300+
301+
// If exactly once is enabled, we should wait until modack responses are successes
302+
// before attempting to process messages.
303+
it.sendModAck(ackIDs, deadline, false)
304+
for ackID, ar := range ackIDs {
305+
ctx := context.Background()
306+
_, err := ar.Get(ctx)
307+
if err != nil {
308+
delete(pendingMessages, ackID)
309+
it.mu.Lock()
310+
// Remove the message from lease management if modack fails here.
311+
delete(it.keepAliveDeadlines, ackID)
312+
it.mu.Unlock()
313+
}
314+
}
315+
// Only return for processing messages that were successfully modack'ed.
316+
v := make([]*ipubsub.Message, 0, len(pendingMessages))
317+
for _, m := range pendingMessages {
318+
v = append(v, m)
319+
}
320+
return v, nil
321+
}
322+
return nil, nil
283323
}
284324

285325
// Get messages using the Pull RPC.
@@ -399,10 +439,10 @@ func (it *messageIterator) sender() {
399439
}
400440
if sendNacks {
401441
// Nack indicated by modifying the deadline to zero.
402-
it.sendModAck(nacks, 0)
442+
it.sendModAck(nacks, 0, false)
403443
}
404444
if sendModAcks {
405-
it.sendModAck(modAcks, dl)
445+
it.sendModAck(modAcks, dl, true)
406446
}
407447
if sendPing {
408448
it.pingStream()
@@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
479519
// percentile in order to capture the highest amount of time necessary without
480520
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
481521
// enabled, we retry it in a separate goroutine for a short duration.
482-
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) {
522+
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
483523
deadlineSec := int32(deadline / time.Second)
484524
ackIDs := make([]string, 0, len(m))
485525
for k := range m {
@@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
517557
if len(toRetry) > 0 {
518558
// Retry modacks/nacks in a separate goroutine.
519559
go func() {
520-
it.retryModAcks(toRetry, deadlineSec)
560+
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
521561
}()
522562
}
523563
}
@@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) {
563603
// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
564604
// since after that, the message will have expired. Nacks are retried up until the default
565605
// deadline of 10 minutes.
566-
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) {
606+
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
567607
bo := newExactlyOnceBackoff()
568608
retryCount := 0
569609
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
570610
defer cancel()
571611
for {
572-
// If context is done, complete all remaining Nacks with DeadlineExceeded
573-
// ModAcks are not exposed to the user so these don't need to be modified.
612+
// If context is done, complete all AckResults with errors.
574613
if ctx.Err() != nil {
575-
if deadlineSec == 0 {
576-
for _, r := range m {
577-
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
578-
}
614+
for _, r := range m {
615+
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
579616
}
580617
return
581618
}
582619
// Only retry modack requests up to 3 times.
583620
if deadlineSec != 0 && retryCount > 3 {
584621
ackIDs := make([]string, 0, len(m))
585-
for k := range m {
622+
for k, ar := range m {
586623
ackIDs = append(ackIDs, k)
624+
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
625+
}
626+
if logOnInvalid {
627+
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
587628
}
588-
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
589629
return
590630
}
591631
// Don't need to split map since this is the retry function and
@@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) {
723763
return nil, nil
724764
}
725765

726-
// processResults processes AckResults by referring to errorStatus and errorsMap.
766+
// processResults processes AckResults by referring to errorStatus and errorsByAckID.
727767
// The errors returned by the server in `errorStatus` or in `errorsByAckID`
728768
// are used to complete the AckResults in `ackResMap` (with a success
729769
// or error) or to return requests for further retries.

pubsub/subscription_test.go

+7-27
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21-
"sync"
2221
"testing"
2322
"time"
2423

@@ -680,8 +679,8 @@ func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
680679
}
681680
}
682681

683-
func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
684-
ctx, cancel := context.WithCancel(context.Background())
682+
func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
683+
ctx := context.Background()
685684
srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error"))
686685
client, err := NewClient(ctx, projName,
687686
option.WithEndpoint(srv.Addr),
@@ -708,31 +707,12 @@ func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) {
708707
if _, err := r.Get(ctx); err != nil {
709708
t.Fatalf("failed to publish message: %v", err)
710709
}
710+
s.ReceiveSettings.MaxExtensionPeriod = 1 * time.Minute
711711

712-
s.ReceiveSettings = ReceiveSettings{
713-
NumGoroutines: 1,
714-
// This needs to be greater than total deadline otherwise the message will be redelivered.
715-
MinExtensionPeriod: 2 * time.Minute,
716-
MaxExtensionPeriod: 2 * time.Minute,
717-
}
712+
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
713+
defer cancel()
718714
// Override the default timeout here so this test doesn't take 10 minutes.
719-
exactlyOnceDeliveryRetryDeadline = 20 * time.Second
720-
var once sync.Once
721-
err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
722-
once.Do(func() {
723-
ar := msg.NackWithResult()
724-
s, err := ar.Get(ctx)
725-
if s != AcknowledgeStatusOther {
726-
t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther)
727-
}
728-
wantErr := context.DeadlineExceeded
729-
if !errors.Is(err, wantErr) {
730-
t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr)
731-
}
732-
cancel()
733-
})
715+
s.Receive(ctx, func(ctx context.Context, msg *Message) {
716+
t.Fatal("expected message to not have been delivered when exactly once enabled")
734717
})
735-
if err != nil {
736-
t.Fatalf("s.Receive err: %v", err)
737-
}
738718
}

0 commit comments

Comments
 (0)