201
201
import java .util .concurrent .Callable ;
202
202
import java .util .concurrent .CancellationException ;
203
203
import java .util .concurrent .ConcurrentHashMap ;
204
+ import java .util .concurrent .ConcurrentLinkedDeque ;
204
205
import java .util .concurrent .ConcurrentMap ;
205
206
import java .util .concurrent .ExecutionException ;
206
207
import java .util .concurrent .ExecutorService ;
@@ -262,6 +263,9 @@ public class GapicSpannerRpc implements SpannerRpc {
262
263
263
264
private final ScheduledExecutorService spannerWatchdog ;
264
265
266
+ private final ConcurrentLinkedDeque <SpannerResponseObserver > responseObservers =
267
+ new ConcurrentLinkedDeque <>();
268
+
265
269
private final boolean throttleAdministrativeRequests ;
266
270
private final RetrySettings retryAdministrativeRequestsSettings ;
267
271
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D ;
@@ -2004,9 +2008,29 @@ <ReqT, RespT> GrpcCallContext newCallContext(
2004
2008
return (GrpcCallContext ) context .merge (apiCallContextFromContext );
2005
2009
}
2006
2010
2011
+ void registerResponseObserver (SpannerResponseObserver responseObserver ) {
2012
+ responseObservers .add (responseObserver );
2013
+ }
2014
+
2015
+ void unregisterResponseObserver (SpannerResponseObserver responseObserver ) {
2016
+ responseObservers .remove (responseObserver );
2017
+ }
2018
+
2019
+ void closeResponseObservers () {
2020
+ responseObservers .forEach (SpannerResponseObserver ::close );
2021
+ responseObservers .clear ();
2022
+ }
2023
+
2024
+ @ InternalApi
2025
+ @ VisibleForTesting
2026
+ public int getNumActiveResponseObservers () {
2027
+ return responseObservers .size ();
2028
+ }
2029
+
2007
2030
@ Override
2008
2031
public void shutdown () {
2009
2032
this .rpcIsClosed = true ;
2033
+ closeResponseObservers ();
2010
2034
if (this .spannerStub != null ) {
2011
2035
this .spannerStub .close ();
2012
2036
this .partitionedDmlStub .close ();
@@ -2028,6 +2052,7 @@ public void shutdown() {
2028
2052
2029
2053
public void shutdownNow () {
2030
2054
this .rpcIsClosed = true ;
2055
+ closeResponseObservers ();
2031
2056
this .spannerStub .close ();
2032
2057
this .partitionedDmlStub .close ();
2033
2058
this .instanceAdminStub .close ();
@@ -2085,7 +2110,7 @@ public void cancel(@Nullable String message) {
2085
2110
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
2086
2111
* the {@link ResultStreamConsumer}.
2087
2112
*/
2088
- private static class SpannerResponseObserver implements ResponseObserver <PartialResultSet > {
2113
+ private class SpannerResponseObserver implements ResponseObserver <PartialResultSet > {
2089
2114
2090
2115
private StreamController controller ;
2091
2116
private final ResultStreamConsumer consumer ;
@@ -2094,13 +2119,21 @@ public SpannerResponseObserver(ResultStreamConsumer consumer) {
2094
2119
this .consumer = consumer ;
2095
2120
}
2096
2121
2122
+ void close () {
2123
+ if (this .controller != null ) {
2124
+ this .controller .cancel ();
2125
+ }
2126
+ }
2127
+
2097
2128
@ Override
2098
2129
public void onStart (StreamController controller ) {
2099
-
2100
2130
// Disable the auto flow control to allow client library
2101
2131
// set the number of messages it prefers to request
2102
2132
controller .disableAutoInboundFlowControl ();
2103
2133
this .controller = controller ;
2134
+ if (this .consumer .cancelQueryWhenClientIsClosed ()) {
2135
+ registerResponseObserver (this );
2136
+ }
2104
2137
}
2105
2138
2106
2139
@ Override
@@ -2110,11 +2143,19 @@ public void onResponse(PartialResultSet response) {
2110
2143
2111
2144
@ Override
2112
2145
public void onError (Throwable t ) {
2146
+ // Unregister the response observer when the query has completed with an error.
2147
+ if (this .consumer .cancelQueryWhenClientIsClosed ()) {
2148
+ unregisterResponseObserver (this );
2149
+ }
2113
2150
consumer .onError (newSpannerException (t ));
2114
2151
}
2115
2152
2116
2153
@ Override
2117
2154
public void onComplete () {
2155
+ // Unregister the response observer when the query has completed normally.
2156
+ if (this .consumer .cancelQueryWhenClientIsClosed ()) {
2157
+ unregisterResponseObserver (this );
2158
+ }
2118
2159
consumer .onCompleted ();
2119
2160
}
2120
2161
0 commit comments