@@ -31,8 +31,9 @@ import (
31
31
// the stream on a retryable error.
32
32
type pullStream struct {
33
33
ctx context.Context
34
- open func () (pb.Subscriber_StreamingPullClient , error )
35
- cancel context.CancelFunc
34
+ cancel context.CancelFunc // cancel function of the context above
35
+ open func () (pb.Subscriber_StreamingPullClient , context.CancelFunc , error )
36
+ close context.CancelFunc // cancel function to close down the currently open stream
36
37
37
38
mu sync.Mutex
38
39
spc * pb.Subscriber_StreamingPullClient
@@ -50,8 +51,9 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
50
51
return & pullStream {
51
52
ctx : ctx ,
52
53
cancel : cancel ,
53
- open : func () (pb.Subscriber_StreamingPullClient , error ) {
54
- spc , err := streamingPull (ctx , gax .WithGRPCOptions (grpc .MaxCallRecvMsgSize (maxSendRecvBytes )))
54
+ open : func () (pb.Subscriber_StreamingPullClient , context.CancelFunc , error ) {
55
+ sctx , close := context .WithCancel (ctx )
56
+ spc , err := streamingPull (sctx , gax .WithGRPCOptions (grpc .MaxCallRecvMsgSize (maxSendRecvBytes )))
55
57
if err == nil {
56
58
recordStat (ctx , StreamRequestCount , 1 )
57
59
streamAckDeadline := int32 (maxDurationPerLeaseExtension / time .Second )
@@ -69,9 +71,10 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
69
71
})
70
72
}
71
73
if err != nil {
72
- return nil , err
74
+ close ()
75
+ return nil , nil , err
73
76
}
74
- return spc , nil
77
+ return spc , close , nil
75
78
},
76
79
}
77
80
}
@@ -100,29 +103,33 @@ func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber
100
103
if spc != s .spc {
101
104
return s .spc , nil
102
105
}
106
+ // we are about to open a new stream: if necessary, make sure the previous one is closed
107
+ if s .close != nil {
108
+ s .close ()
109
+ }
103
110
// Either this is the very first call on this stream (s.spc == nil), or we have a valid
104
111
// retry request. Either way, open a new stream.
105
112
// The lock is held here for a long time, but it doesn't matter because no callers could get
106
113
// anything done anyway.
107
114
s .spc = new (pb.Subscriber_StreamingPullClient )
108
- * s .spc , s .err = s .openWithRetry () // Any error from openWithRetry is permanent.
115
+ * s .spc , s .close , s . err = s .openWithRetry () // Any error from openWithRetry is permanent.
109
116
return s .spc , s .err
110
117
}
111
118
112
- func (s * pullStream ) openWithRetry () (pb.Subscriber_StreamingPullClient , error ) {
119
+ func (s * pullStream ) openWithRetry () (pb.Subscriber_StreamingPullClient , context. CancelFunc , error ) {
113
120
r := defaultRetryer {}
114
121
for {
115
122
recordStat (s .ctx , StreamOpenCount , 1 )
116
- spc , err := s .open ()
123
+ spc , close , err := s .open ()
117
124
bo , shouldRetry := r .Retry (err )
118
125
if err != nil && shouldRetry {
119
126
recordStat (s .ctx , StreamRetryCount , 1 )
120
127
if err := gax .Sleep (s .ctx , bo ); err != nil {
121
- return nil , err
128
+ return nil , nil , err
122
129
}
123
130
continue
124
131
}
125
- return spc , err
132
+ return spc , close , err
126
133
}
127
134
}
128
135
0 commit comments