Skip to content

Commit 369c8a7

Browse files
authored
feat: support setting an async executor provider (#1263)
* feat: support setting an async executor provider * fix: set defaults in builder
1 parent 3f0fa63 commit 369c8a7

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

+41-6
Original file line numberDiff line numberDiff line change
@@ -471,13 +471,17 @@ public ServiceRpc create(SpannerOptions options) {
471471
private static final AtomicInteger DEFAULT_POOL_COUNT = new AtomicInteger();
472472

473473
/** {@link ExecutorProvider} that is used for {@link AsyncResultSet}. */
474-
interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
474+
public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
475475
/** Overridden to suppress the throws declaration of the super interface. */
476476
@Override
477477
void close();
478478
}
479479

480-
static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
480+
/**
481+
* Implementation of {@link CloseableExecutorProvider} that uses a fixed single {@link
482+
* ScheduledExecutorService}.
483+
*/
484+
public static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
481485
private final ScheduledExecutorService executor;
482486

483487
private FixedCloseableExecutorProvider(ScheduledExecutorService executor) {
@@ -500,7 +504,7 @@ public boolean shouldAutoClose() {
500504
}
501505

502506
/** Creates a FixedCloseableExecutorProvider. */
503-
static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
507+
public static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
504508
return new FixedCloseableExecutorProvider(executor);
505509
}
506510
}
@@ -516,8 +520,19 @@ static CloseableExecutorProvider createDefaultAsyncExecutorProvider() {
516520
return createAsyncExecutorProvider(8, 60L, TimeUnit.SECONDS);
517521
}
518522

519-
@VisibleForTesting
520-
static CloseableExecutorProvider createAsyncExecutorProvider(
523+
/**
524+
* Creates a {@link CloseableExecutorProvider} that can be used as an {@link ExecutorProvider} for
525+
* the async API. The {@link ExecutorProvider} will lazily create up to poolSize threads. The
526+
* backing threads will automatically be shutdown if they have not been used during the keep-alive
527+
* time. The backing threads are created as daemon threads.
528+
*
529+
* @param poolSize the maximum number of threads to create in the pool
530+
* @param keepAliveTime the time that an unused thread in the pool should be kept alive
531+
* @param unit the time unit used for the keepAliveTime
532+
* @return a {@link CloseableExecutorProvider} that can be used for {@link
533+
* SpannerOptions.Builder#setAsyncExecutorProvider(CloseableExecutorProvider)}
534+
*/
535+
public static CloseableExecutorProvider createAsyncExecutorProvider(
521536
int poolSize, long keepAliveTime, TimeUnit unit) {
522537
String format =
523538
String.format("spanner-async-pool-%d-thread-%%d", DEFAULT_POOL_COUNT.incrementAndGet());
@@ -1018,6 +1033,26 @@ public Builder setCompressorName(@Nullable String compressorName) {
10181033
return this;
10191034
}
10201035

1036+
/**
1037+
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
1038+
* such as fetching results for an {@link AsyncResultSet}.
1039+
*
1040+
* <p>Async methods will use a sensible default if no custom {@link ExecutorProvider} has been
1041+
* set. The default {@link ExecutorProvider} uses a cached thread pool containing a maximum of 8
1042+
* threads. The pool is lazily initialized and will not create any threads if the user
1043+
* application does not use any async methods. It will also scale down the thread usage if the
1044+
* async load allows for that.
1045+
*
1046+
* <p>Call {@link SpannerOptions#createAsyncExecutorProvider(int, long, TimeUnit)} to create a
1047+
* provider with a custom pool size or call {@link
1048+
* FixedCloseableExecutorProvider#create(ScheduledExecutorService)} to create a {@link
1049+
* CloseableExecutorProvider} from a standard Java {@link ScheduledExecutorService}.
1050+
*/
1051+
public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
1052+
this.asyncExecutorProvider = provider;
1053+
return this;
1054+
}
1055+
10211056
/**
10221057
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
10231058
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
@@ -1198,7 +1233,7 @@ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
11981233
return options;
11991234
}
12001235

1201-
CloseableExecutorProvider getAsyncExecutorProvider() {
1236+
public CloseableExecutorProvider getAsyncExecutorProvider() {
12021237
return asyncExecutorProvider;
12031238
}
12041239

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import static com.google.common.truth.Truth.assertThat;
2020
import static org.hamcrest.CoreMatchers.is;
2121
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.Assert.assertSame;
2223
import static org.junit.Assert.assertThrows;
24+
import static org.mockito.Mockito.mock;
2325

2426
import com.google.api.gax.grpc.GrpcCallContext;
2527
import com.google.api.gax.retrying.RetrySettings;
@@ -29,6 +31,7 @@
2931
import com.google.cloud.NoCredentials;
3032
import com.google.cloud.ServiceOptions;
3133
import com.google.cloud.TransportOptions;
34+
import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider;
3235
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
3336
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
3437
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
@@ -54,6 +57,7 @@
5457
import java.util.HashMap;
5558
import java.util.List;
5659
import java.util.Map;
60+
import java.util.concurrent.ScheduledExecutorService;
5761
import javax.annotation.Nonnull;
5862
import org.junit.Test;
5963
import org.junit.runner.RunWith;
@@ -892,4 +896,16 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() {
892896
.getTimeout())
893897
.isEqualTo(Duration.ofSeconds(6L));
894898
}
899+
900+
@Test
901+
public void testCustomAsyncExecutorProvider() {
902+
ScheduledExecutorService service = mock(ScheduledExecutorService.class);
903+
SpannerOptions options =
904+
SpannerOptions.newBuilder()
905+
.setProjectId("test-project")
906+
.setCredentials(NoCredentials.getInstance())
907+
.setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service))
908+
.build();
909+
assertSame(service, options.getAsyncExecutorProvider().getExecutor());
910+
}
895911
}

0 commit comments

Comments
 (0)