17
17
package com .google .cloud .spanner ;
18
18
19
19
import static com .google .cloud .spanner .SpannerExceptionFactory .newSpannerException ;
20
+ import static com .google .cloud .spanner .SpannerExceptionFactory .newSpannerExceptionForCancellation ;
20
21
import static com .google .common .base .Preconditions .checkArgument ;
21
22
import static com .google .common .base .Preconditions .checkNotNull ;
22
23
import static com .google .common .base .Preconditions .checkState ;
23
24
24
25
import com .google .api .client .util .BackOff ;
26
+ import com .google .api .client .util .ExponentialBackOff ;
27
+ import com .google .api .gax .retrying .RetrySettings ;
25
28
import com .google .cloud .ByteArray ;
26
29
import com .google .cloud .Date ;
27
30
import com .google .cloud .Timestamp ;
28
31
import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
32
+ import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
29
33
import com .google .common .annotations .VisibleForTesting ;
30
34
import com .google .common .base .Function ;
31
35
import com .google .common .collect .AbstractIterator ;
46
50
import io .opencensus .trace .Span ;
47
51
import io .opencensus .trace .Tracer ;
48
52
import io .opencensus .trace .Tracing ;
53
+ import java .io .IOException ;
49
54
import java .io .Serializable ;
50
55
import java .util .AbstractList ;
51
56
import java .util .ArrayList ;
55
60
import java .util .LinkedList ;
56
61
import java .util .List ;
57
62
import java .util .concurrent .BlockingQueue ;
63
+ import java .util .concurrent .CountDownLatch ;
64
+ import java .util .concurrent .Executor ;
58
65
import java .util .concurrent .LinkedBlockingQueue ;
66
+ import java .util .concurrent .TimeUnit ;
59
67
import java .util .logging .Level ;
60
68
import java .util .logging .Logger ;
61
69
import javax .annotation .Nullable ;
@@ -820,8 +828,10 @@ void setCall(SpannerRpc.StreamingCall call) {
820
828
@ VisibleForTesting
821
829
abstract static class ResumableStreamIterator extends AbstractIterator <PartialResultSet >
822
830
implements CloseableIterator <PartialResultSet > {
831
+ private static final RetrySettings STREAMING_RETRY_SETTINGS =
832
+ SpannerStubSettings .newBuilder ().executeStreamingSqlSettings ().getRetrySettings ();
823
833
private static final Logger logger = Logger .getLogger (ResumableStreamIterator .class .getName ());
824
- private final BackOff backOff = SpannerImpl . newBackOff ();
834
+ private final BackOff backOff = newBackOff ();
825
835
private final LinkedList <PartialResultSet > buffer = new LinkedList <>();
826
836
private final int maxBufferSize ;
827
837
private final Span span ;
@@ -841,6 +851,70 @@ protected ResumableStreamIterator(int maxBufferSize, String streamName, Span par
841
851
this .span = tracer .spanBuilderWithExplicitParent (streamName , parent ).startSpan ();
842
852
}
843
853
854
+ private static ExponentialBackOff newBackOff () {
855
+ return new ExponentialBackOff .Builder ()
856
+ .setMultiplier (STREAMING_RETRY_SETTINGS .getRetryDelayMultiplier ())
857
+ .setInitialIntervalMillis (
858
+ (int ) STREAMING_RETRY_SETTINGS .getInitialRetryDelay ().toMillis ())
859
+ .setMaxIntervalMillis ((int ) STREAMING_RETRY_SETTINGS .getMaxRetryDelay ().toMillis ())
860
+ .setMaxElapsedTimeMillis (Integer .MAX_VALUE ) // Prevent Backoff.STOP from getting returned.
861
+ .build ();
862
+ }
863
+
864
+ private static void backoffSleep (Context context , BackOff backoff ) throws SpannerException {
865
+ backoffSleep (context , nextBackOffMillis (backoff ));
866
+ }
867
+
868
+ private static long nextBackOffMillis (BackOff backoff ) throws SpannerException {
869
+ try {
870
+ return backoff .nextBackOffMillis ();
871
+ } catch (IOException e ) {
872
+ throw newSpannerException (ErrorCode .INTERNAL , e .getMessage (), e );
873
+ }
874
+ }
875
+
876
+ private static void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
877
+ tracer
878
+ .getCurrentSpan ()
879
+ .addAnnotation (
880
+ "Backing off" ,
881
+ ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (backoffMillis )));
882
+ final CountDownLatch latch = new CountDownLatch (1 );
883
+ final Context .CancellationListener listener =
884
+ new Context .CancellationListener () {
885
+ @ Override
886
+ public void cancelled (Context context ) {
887
+ // Wakeup on cancellation / DEADLINE_EXCEEDED.
888
+ latch .countDown ();
889
+ }
890
+ };
891
+
892
+ context .addListener (listener , DirectExecutor .INSTANCE );
893
+ try {
894
+ if (backoffMillis == BackOff .STOP ) {
895
+ // Highly unlikely but we handle it just in case.
896
+ backoffMillis = STREAMING_RETRY_SETTINGS .getMaxRetryDelay ().toMillis ();
897
+ }
898
+ if (latch .await (backoffMillis , TimeUnit .MILLISECONDS )) {
899
+ // Woken by context cancellation.
900
+ throw newSpannerExceptionForCancellation (context , null );
901
+ }
902
+ } catch (InterruptedException interruptExcept ) {
903
+ throw newSpannerExceptionForCancellation (context , interruptExcept );
904
+ } finally {
905
+ context .removeListener (listener );
906
+ }
907
+ }
908
+
909
+ private enum DirectExecutor implements Executor {
910
+ INSTANCE ;
911
+
912
+ @ Override
913
+ public void execute (Runnable command ) {
914
+ command .run ();
915
+ }
916
+ }
917
+
844
918
abstract CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken );
845
919
846
920
@ Override
@@ -915,9 +989,9 @@ protected PartialResultSet computeNext() {
915
989
try (Scope s = tracer .withSpan (span )) {
916
990
long delay = e .getRetryDelayInMillis ();
917
991
if (delay != -1 ) {
918
- SpannerImpl . backoffSleep (context , delay );
992
+ backoffSleep (context , delay );
919
993
} else {
920
- SpannerImpl . backoffSleep (context , backOff );
994
+ backoffSleep (context , backOff );
921
995
}
922
996
}
923
997
continue ;
0 commit comments