|
26 | 26 | import com.google.api.gax.core.GaxProperties;
|
27 | 27 | import com.google.api.gax.grpc.GaxGrpcProperties;
|
28 | 28 | import com.google.api.gax.grpc.GrpcCallContext;
|
| 29 | +import com.google.api.gax.grpc.GrpcCallSettings; |
| 30 | +import com.google.api.gax.grpc.GrpcStubCallableFactory; |
29 | 31 | import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
|
30 | 32 | import com.google.api.gax.longrunning.OperationFuture;
|
31 | 33 | import com.google.api.gax.retrying.ResultRetryAlgorithm;
|
|
35 | 37 | import com.google.api.gax.rpc.ApiCallContext;
|
36 | 38 | import com.google.api.gax.rpc.ApiClientHeaderProvider;
|
37 | 39 | import com.google.api.gax.rpc.ApiException;
|
| 40 | +import com.google.api.gax.rpc.ClientContext; |
38 | 41 | import com.google.api.gax.rpc.FixedHeaderProvider;
|
39 | 42 | import com.google.api.gax.rpc.HeaderProvider;
|
40 | 43 | import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
|
|
44 | 47 | import com.google.api.gax.rpc.StatusCode;
|
45 | 48 | import com.google.api.gax.rpc.StreamController;
|
46 | 49 | import com.google.api.gax.rpc.TransportChannelProvider;
|
| 50 | +import com.google.api.gax.rpc.UnaryCallSettings; |
| 51 | +import com.google.api.gax.rpc.UnaryCallable; |
47 | 52 | import com.google.api.gax.rpc.UnavailableException;
|
48 | 53 | import com.google.api.gax.rpc.WatchdogProvider;
|
49 | 54 | import com.google.api.pathtemplate.PathTemplate;
|
|
59 | 64 | import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider;
|
60 | 65 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
|
61 | 66 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
|
| 67 | +import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory; |
62 | 68 | import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
|
63 | 69 | import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
|
64 | 70 | import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
|
|
72 | 78 | import com.google.common.base.Preconditions;
|
73 | 79 | import com.google.common.collect.ImmutableList;
|
74 | 80 | import com.google.common.collect.ImmutableMap;
|
| 81 | +import com.google.common.collect.ImmutableSet; |
75 | 82 | import com.google.common.util.concurrent.RateLimiter;
|
76 | 83 | import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
77 | 84 | import com.google.iam.v1.GetIamPolicyRequest;
|
|
157 | 164 | import java.util.LinkedList;
|
158 | 165 | import java.util.List;
|
159 | 166 | import java.util.Map;
|
| 167 | +import java.util.Set; |
160 | 168 | import java.util.concurrent.Callable;
|
161 | 169 | import java.util.concurrent.CancellationException;
|
162 | 170 | import java.util.concurrent.ConcurrentHashMap;
|
@@ -443,7 +451,45 @@ public GapicSpannerRpc(final SpannerOptions options) {
|
443 | 451 | .setCredentialsProvider(credentialsProvider)
|
444 | 452 | .setStreamWatchdogProvider(watchdogProvider)
|
445 | 453 | .build();
|
446 |
| - this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings); |
| 454 | + |
| 455 | + // Automatically retry RESOURCE_EXHAUSTED for GetOperation if auto-throttling of |
| 456 | + // administrative requests has been set. The GetOperation RPC is called repeatedly by gax |
| 457 | + // while polling long-running operations for their progress and can also cause these errors. |
| 458 | + // The default behavior is not to retry these errors, and this option should normally only be |
| 459 | + // enabled for (integration) testing. |
| 460 | + if (options.isAutoThrottleAdministrativeRequests()) { |
| 461 | + GrpcStubCallableFactory factory = |
| 462 | + new GrpcDatabaseAdminCallableFactory() { |
| 463 | + @Override |
| 464 | + public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCallable( |
| 465 | + GrpcCallSettings<RequestT, ResponseT> grpcCallSettings, |
| 466 | + UnaryCallSettings<RequestT, ResponseT> callSettings, |
| 467 | + ClientContext clientContext) { |
| 468 | + // Make GetOperation retry on RESOURCE_EXHAUSTED to prevent polling operations from |
| 469 | + // failing with an Administrative requests limit exceeded error. |
| 470 | + if (grpcCallSettings |
| 471 | + .getMethodDescriptor() |
| 472 | + .getFullMethodName() |
| 473 | + .equals("google.longrunning.Operations/GetOperation")) { |
| 474 | + Set<StatusCode.Code> codes = |
| 475 | + ImmutableSet.<StatusCode.Code>builderWithExpectedSize( |
| 476 | + callSettings.getRetryableCodes().size() + 1) |
| 477 | + .addAll(callSettings.getRetryableCodes()) |
| 478 | + .add(StatusCode.Code.RESOURCE_EXHAUSTED) |
| 479 | + .build(); |
| 480 | + callSettings = callSettings.toBuilder().setRetryableCodes(codes).build(); |
| 481 | + } |
| 482 | + return super.createUnaryCallable(grpcCallSettings, callSettings, clientContext); |
| 483 | + } |
| 484 | + }; |
| 485 | + this.databaseAdminStub = |
| 486 | + new GrpcDatabaseAdminStubWithCustomCallableFactory( |
| 487 | + databaseAdminStubSettings, |
| 488 | + ClientContext.create(databaseAdminStubSettings), |
| 489 | + factory); |
| 490 | + } else { |
| 491 | + this.databaseAdminStub = GrpcDatabaseAdminStub.create(databaseAdminStubSettings); |
| 492 | + } |
447 | 493 |
|
448 | 494 | // Check whether the SPANNER_EMULATOR_HOST env var has been set, and if so, if the emulator is
|
449 | 495 | // actually running.
|
@@ -504,9 +550,9 @@ private static void checkEmulatorConnection(
|
504 | 550 |
|
505 | 551 | private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
|
506 | 552 | RetrySettings.newBuilder()
|
507 |
| - .setInitialRetryDelay(Duration.ofSeconds(2L)) |
508 |
| - .setRetryDelayMultiplier(1.5) |
509 |
| - .setMaxRetryDelay(Duration.ofSeconds(15L)) |
| 553 | + .setInitialRetryDelay(Duration.ofSeconds(5L)) |
| 554 | + .setRetryDelayMultiplier(2.0) |
| 555 | + .setMaxRetryDelay(Duration.ofSeconds(60L)) |
510 | 556 | .setMaxAttempts(10)
|
511 | 557 | .build();
|
512 | 558 |
|
@@ -1021,6 +1067,11 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> call() throws Exception
|
1021 | 1067 | throw newSpannerException(e);
|
1022 | 1068 | } catch (ExecutionException e) {
|
1023 | 1069 | Throwable t = e.getCause();
|
| 1070 | + SpannerException se = SpannerExceptionFactory.asSpannerException(t); |
| 1071 | + if (se instanceof AdminRequestsPerMinuteExceededException) { |
| 1072 | + // Propagate this to trigger a retry. |
| 1073 | + throw se; |
| 1074 | + } |
1024 | 1075 | if (t instanceof AlreadyExistsException) {
|
1025 | 1076 | String operationName =
|
1026 | 1077 | OPERATION_NAME_TEMPLATE.instantiate(
|
|
0 commit comments