Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 20f6ecf

Browse files
authored
feat: dynamic flow control for batcher part 2 (#1310)
* feat: dynamic flow control for batcher part 2 * add test on construtor for code coverage * Update comments based on the review * missed some error messages in test * Fix possible deadlock * fix comment * update comments * update naming * removing setThreshold because there's no use case for it * no need to use 2 locks on semaphore * fix comment * fix test name * make nonblocking semaphore not blocking * move limit to semaphore class * add an addPermits method * fixing naming, don't throw on overflow and add some tests
1 parent 7f7aa25 commit 20f6ecf

10 files changed

+1251
-77
lines changed

Diff for: gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,16 @@ public BatcherImpl(
141141
// to avoid deadlocking
142142
if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
143143
Preconditions.checkArgument(
144-
flowController.getMaxOutstandingElementCount() == null
144+
flowController.getMaxElementCountLimit() == null
145145
|| batchingSettings.getElementCountThreshold() == null
146-
|| flowController.getMaxOutstandingElementCount()
146+
|| flowController.getMaxElementCountLimit()
147147
>= batchingSettings.getElementCountThreshold(),
148148
"If throttling and batching on element count are enabled, FlowController"
149149
+ "#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
150150
Preconditions.checkArgument(
151-
flowController.getMaxOutstandingRequestBytes() == null
151+
flowController.getMaxRequestBytesLimit() == null
152152
|| batchingSettings.getRequestByteThreshold() == null
153-
|| flowController.getMaxOutstandingRequestBytes()
153+
|| flowController.getMaxRequestBytesLimit()
154154
>= batchingSettings.getRequestByteThreshold(),
155155
"If throttling and batching on request bytes are enabled, FlowController"
156156
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");

Diff for: gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java

+56-6
Original file line numberDiff line numberDiff line change
@@ -34,40 +34,90 @@
3434

3535
/** A {@link Semaphore64} that blocks until permits become available. */
3636
class BlockingSemaphore implements Semaphore64 {
37-
private long currentPermits;
37+
private long availablePermits;
38+
private long limit;
3839

3940
private static void checkNotNegative(long l) {
4041
Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l);
4142
}
4243

4344
BlockingSemaphore(long permits) {
4445
checkNotNegative(permits);
45-
this.currentPermits = permits;
46+
this.availablePermits = permits;
47+
this.limit = permits;
4648
}
4749

50+
@Override
4851
public synchronized void release(long permits) {
4952
checkNotNegative(permits);
50-
51-
currentPermits += permits;
53+
// TODO: throw exceptions when the permits overflow
54+
availablePermits = Math.min(availablePermits + permits, limit);
5255
notifyAll();
5356
}
5457

58+
@Override
5559
public synchronized boolean acquire(long permits) {
5660
checkNotNegative(permits);
5761

5862
boolean interrupted = false;
59-
while (currentPermits < permits) {
63+
while (availablePermits < permits) {
64+
try {
65+
wait();
66+
} catch (InterruptedException e) {
67+
interrupted = true;
68+
}
69+
}
70+
// TODO: if thread is interrupted, we should not grant the permits
71+
availablePermits -= permits;
72+
73+
if (interrupted) {
74+
Thread.currentThread().interrupt();
75+
}
76+
return true;
77+
}
78+
79+
@Override
80+
public synchronized boolean acquirePartial(long permits) {
81+
checkNotNegative(permits);
82+
83+
boolean interrupted = false;
84+
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
85+
// limit. This will allow individual large requests to be sent. Please note that this behavior
86+
// will result in availablePermits going negative.
87+
while (availablePermits < Math.min(limit, permits)) {
6088
try {
6189
wait();
6290
} catch (InterruptedException e) {
6391
interrupted = true;
6492
}
6593
}
66-
currentPermits -= permits;
6794

6895
if (interrupted) {
6996
Thread.currentThread().interrupt();
7097
}
98+
99+
availablePermits -= permits;
71100
return true;
72101
}
102+
103+
@Override
104+
public synchronized void increasePermitLimit(long permits) {
105+
checkNotNegative(permits);
106+
availablePermits += permits;
107+
limit += permits;
108+
notifyAll();
109+
}
110+
111+
@Override
112+
public synchronized void reducePermitLimit(long reduction) {
113+
checkNotNegative(reduction);
114+
Preconditions.checkState(limit - reduction > 0, "permit limit underflow");
115+
availablePermits -= reduction;
116+
limit -= reduction;
117+
}
118+
119+
@Override
120+
public synchronized long getPermitLimit() {
121+
return limit;
122+
}
73123
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.api.core.InternalApi;
33+
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
34+
import com.google.auto.value.AutoValue;
35+
import com.google.common.base.Preconditions;
36+
import javax.annotation.Nullable;
37+
38+
/** Settings for dynamic flow control */
39+
@AutoValue
40+
@InternalApi("For google-cloud-java client use only")
41+
public abstract class DynamicFlowControlSettings {
42+
43+
/** Number of outstanding elements that {@link FlowController} allows when it's initiated. */
44+
@Nullable
45+
public abstract Long getInitialOutstandingElementCount();
46+
47+
/** Number of outstanding bytes that {@link FlowController} allows when it's initiated. */
48+
@Nullable
49+
public abstract Long getInitialOutstandingRequestBytes();
50+
51+
/**
52+
* Maximum number of outstanding elements {@link FlowController} allows before enforcing flow
53+
* control.
54+
*/
55+
@Nullable
56+
public abstract Long getMaxOutstandingElementCount();
57+
58+
/**
59+
* Maximum number of outstanding bytes {@link FlowController} allows before enforcing flow
60+
* control.
61+
*/
62+
@Nullable
63+
public abstract Long getMaxOutstandingRequestBytes();
64+
65+
/**
66+
* Minimum number of outstanding elements {@link FlowController} allows before enforcing flow
67+
* control.
68+
*/
69+
@Nullable
70+
public abstract Long getMinOutstandingElementCount();
71+
72+
/**
73+
* Minimum number of outstanding bytes {@link FlowController} allows before enforcing flow
74+
* control.
75+
*/
76+
@Nullable
77+
public abstract Long getMinOutstandingRequestBytes();
78+
79+
/** @see FlowControlSettings#getLimitExceededBehavior() */
80+
public abstract LimitExceededBehavior getLimitExceededBehavior();
81+
82+
public abstract Builder toBuilder();
83+
84+
public static Builder newBuilder() {
85+
return new AutoValue_DynamicFlowControlSettings.Builder()
86+
.setLimitExceededBehavior(LimitExceededBehavior.Block);
87+
}
88+
89+
@AutoValue.Builder
90+
public abstract static class Builder {
91+
92+
public abstract Builder setInitialOutstandingElementCount(Long value);
93+
94+
public abstract Builder setInitialOutstandingRequestBytes(Long value);
95+
96+
public abstract Builder setMaxOutstandingElementCount(Long value);
97+
98+
public abstract Builder setMaxOutstandingRequestBytes(Long value);
99+
100+
public abstract Builder setMinOutstandingElementCount(Long value);
101+
102+
public abstract Builder setMinOutstandingRequestBytes(Long value);
103+
104+
public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);
105+
106+
abstract DynamicFlowControlSettings autoBuild();
107+
108+
public DynamicFlowControlSettings build() {
109+
DynamicFlowControlSettings settings = autoBuild();
110+
111+
verifyElementCountSettings(settings);
112+
verifyRequestBytesSettings(settings);
113+
114+
return settings;
115+
}
116+
117+
private void verifyElementCountSettings(DynamicFlowControlSettings settings) {
118+
boolean isEnabled =
119+
settings.getInitialOutstandingElementCount() != null
120+
|| settings.getMinOutstandingElementCount() != null
121+
|| settings.getMaxOutstandingElementCount() != null;
122+
if (!isEnabled) {
123+
return;
124+
}
125+
Preconditions.checkState(
126+
settings.getInitialOutstandingElementCount() != null
127+
&& settings.getMinOutstandingElementCount() != null
128+
&& settings.getMaxOutstandingElementCount() != null,
129+
"Throttling on element count is disabled by default. To enable this setting,"
130+
+ " minOutstandingElementCount, initialOutstandingElementCount, and "
131+
+ "maxOutstandingElementCount must all be set.");
132+
Preconditions.checkState(
133+
settings.getMinOutstandingElementCount() > 0
134+
&& settings.getInitialOutstandingElementCount()
135+
<= settings.getMaxOutstandingElementCount()
136+
&& settings.getInitialOutstandingElementCount()
137+
>= settings.getMinOutstandingElementCount(),
138+
"If throttling on element count is set, minOutstandingElementCount must be"
139+
+ " greater than 0, and minOutstandingElementCount <= "
140+
+ "initialOutstandingElementCount <= maxOutstandingElementCount");
141+
}
142+
143+
private void verifyRequestBytesSettings(DynamicFlowControlSettings settings) {
144+
boolean isEnabled =
145+
settings.getInitialOutstandingRequestBytes() != null
146+
|| settings.getMinOutstandingRequestBytes() != null
147+
|| settings.getMaxOutstandingRequestBytes() != null;
148+
if (!isEnabled) {
149+
return;
150+
}
151+
Preconditions.checkState(
152+
settings.getInitialOutstandingRequestBytes() != null
153+
&& settings.getMinOutstandingRequestBytes() != null
154+
&& settings.getMaxOutstandingRequestBytes() != null,
155+
"Throttling on number of bytes is disabled by default. To enable this "
156+
+ "setting, minOutstandingRequestBytes, initialOutstandingRequestBytes, and "
157+
+ "maxOutstandingRequestBytes must all be set");
158+
Preconditions.checkState(
159+
settings.getMinOutstandingRequestBytes() > 0
160+
&& settings.getInitialOutstandingRequestBytes()
161+
<= settings.getMaxOutstandingRequestBytes()
162+
&& settings.getInitialOutstandingRequestBytes()
163+
>= settings.getMinOutstandingRequestBytes(),
164+
"If throttling on number of bytes is set, minOutstandingRequestBytes must "
165+
+ "be greater than 0, and minOutstandingRequestBytes <= "
166+
+ "initialOutstandingRequestBytes <= maxOutstandingRequestBytes");
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)