Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 5329ea4

Browse files
feat: dynamic flow control p3: add FlowControllerEventStats (#1332)
* feat: dynamic flow control p3: add FlowControllerEventStats * add some more tests * add more documentation * fix the formatting * fix the failed test * Update comments * updates based on review * missed one review * remove the flaky test * deflake test * add some comments Co-authored-by: Igor Bernstein <[email protected]>
1 parent 51c40ab commit 5329ea4

File tree

5 files changed

+359
-8
lines changed

5 files changed

+359
-8
lines changed

gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,8 @@ public void close() throws InterruptedException {
293293
}
294294
}
295295

296-
/** Package-private for use in testing. */
297-
@VisibleForTesting
298-
FlowController getFlowController() {
296+
@InternalApi("For google-cloud-java client use only")
297+
public FlowController getFlowController() {
299298
return flowController;
300299
}
301300

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import static com.google.api.gax.batching.FlowController.LimitExceededBehavior;
33+
34+
import com.google.api.core.InternalApi;
35+
import com.google.api.gax.batching.FlowController.FlowControlException;
36+
import com.google.common.annotations.VisibleForTesting;
37+
import com.google.common.base.Preconditions;
38+
import java.util.concurrent.TimeUnit;
39+
import javax.annotation.Nullable;
40+
41+
/**
42+
* Record the statistics of flow control events.
43+
*
44+
* <p>This class is populated by FlowController, which will record throttling events. Currently it
45+
* only keeps the last flow control event, but it could be expanded to record more information in
46+
* the future. The events can be used to dynamically adjust concurrency in the client. For example:
47+
*
48+
* <pre>{@code
49+
* // Increase flow control limits if there was throttling in the past 5 minutes and throttled time
50+
* // was longer than 1 minute.
51+
* while(true) {
52+
* FlowControlEvent event = flowControlEventStats.getLastFlowControlEvent();
53+
* if (event != null
54+
* && event.getTimestampMs() > System.currentMillis() - TimeUnit.MINUTES.toMillis(5)
55+
* && event.getThrottledTimeInMs() > TimeUnit.MINUTES.toMillis(1)) {
56+
* flowController.increaseThresholds(elementSteps, byteSteps);
57+
* }
58+
* Thread.sleep(TimeUnit.MINUTE.toMillis(10));
59+
* }
60+
* }</pre>
61+
*/
62+
@InternalApi("For google-cloud-java client use only")
63+
public class FlowControlEventStats {
64+
65+
private volatile FlowControlEvent lastFlowControlEvent;
66+
67+
// We only need the last event to check if there was throttling in the past X minutes so this
68+
// doesn't need to be super accurate.
69+
void recordFlowControlEvent(FlowControlEvent event) {
70+
if (lastFlowControlEvent == null || event.compareTo(lastFlowControlEvent) > 0) {
71+
lastFlowControlEvent = event;
72+
}
73+
}
74+
75+
public FlowControlEvent getLastFlowControlEvent() {
76+
return lastFlowControlEvent;
77+
}
78+
79+
/**
80+
* A flow control event. Record throttled time if {@link LimitExceededBehavior} is {@link
81+
* LimitExceededBehavior#Block}, or the exception if the behavior is {@link
82+
* LimitExceededBehavior#ThrowException}.
83+
*/
84+
public static class FlowControlEvent implements Comparable<FlowControlEvent> {
85+
static FlowControlEvent createReserveDelayed(long throttledTimeInMs) {
86+
return createReserveDelayed(System.currentTimeMillis(), throttledTimeInMs);
87+
}
88+
89+
static FlowControlEvent createReserveDenied(FlowControlException exception) {
90+
return createReserveDenied(System.currentTimeMillis(), exception);
91+
}
92+
93+
/** Package-private for use in testing. */
94+
@VisibleForTesting
95+
static FlowControlEvent createReserveDelayed(long timestampMs, long throttledTimeInMs) {
96+
Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
97+
Preconditions.checkArgument(throttledTimeInMs > 0, "throttled time must be greater than 0");
98+
return new FlowControlEvent(timestampMs, throttledTimeInMs, null);
99+
}
100+
101+
/** Package-private for use in testing. */
102+
@VisibleForTesting
103+
static FlowControlEvent createReserveDenied(long timestampMs, FlowControlException exception) {
104+
Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
105+
Preconditions.checkNotNull(
106+
exception, "FlowControlException can't be null when reserve is denied");
107+
return new FlowControlEvent(timestampMs, null, exception);
108+
}
109+
110+
private long timestampMs;
111+
private Long throttledTimeMs;
112+
private FlowControlException exception;
113+
114+
private FlowControlEvent(
115+
long timestampMs,
116+
@Nullable Long throttledTimeMs,
117+
@Nullable FlowControlException exception) {
118+
this.timestampMs = timestampMs;
119+
this.throttledTimeMs = throttledTimeMs;
120+
this.exception = exception;
121+
}
122+
123+
public long getTimestampMs() {
124+
return timestampMs;
125+
}
126+
127+
@Nullable
128+
public FlowControlException getException() {
129+
return exception;
130+
}
131+
132+
@Nullable
133+
public Long getThrottledTime(TimeUnit timeUnit) {
134+
return throttledTimeMs == null
135+
? null
136+
: timeUnit.convert(throttledTimeMs, TimeUnit.MILLISECONDS);
137+
}
138+
139+
@Override
140+
public int compareTo(FlowControlEvent otherEvent) {
141+
return Long.compare(this.getTimestampMs(), otherEvent.getTimestampMs());
142+
}
143+
}
144+
}

gax/src/main/java/com/google/api/gax/batching/FlowController.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131

3232
import com.google.api.core.BetaApi;
3333
import com.google.api.core.InternalApi;
34+
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
3435
import com.google.common.base.Preconditions;
36+
import com.google.common.base.Stopwatch;
37+
import java.util.concurrent.TimeUnit;
3538
import javax.annotation.Nullable;
3639

3740
/** Provides flow control capability. */
@@ -149,6 +152,11 @@ public enum LimitExceededBehavior {
149152
private final LimitExceededBehavior limitExceededBehavior;
150153
private final Object updateLimitLock;
151154

155+
// Threshold to record throttling events. If reserve() takes longer than this threshold, it will
156+
// be recorded as a throttling event.
157+
private static final long RESERVE_FLOW_CONTROL_THRESHOLD_MS = 1;
158+
private final FlowControlEventStats flowControlEventStats;
159+
152160
public FlowController(FlowControlSettings settings) {
153161
// When the FlowController is initialized with FlowControlSettings, flow control limits can't be
154162
// adjusted. min, current, max element count and request bytes are initialized with the max
@@ -160,6 +168,7 @@ public FlowController(FlowControlSettings settings) {
160168
public FlowController(DynamicFlowControlSettings settings) {
161169
this.limitExceededBehavior = settings.getLimitExceededBehavior();
162170
this.updateLimitLock = new Object();
171+
this.flowControlEventStats = new FlowControlEventStats();
163172
switch (settings.getLimitExceededBehavior()) {
164173
case ThrowException:
165174
case Block:
@@ -204,10 +213,15 @@ public void reserve(long elements, long bytes) throws FlowControlException {
204213
Preconditions.checkArgument(elements >= 0);
205214
Preconditions.checkArgument(bytes >= 0);
206215

216+
Stopwatch stopwatch = Stopwatch.createStarted();
207217
if (outstandingElementCount != null) {
208218
if (!outstandingElementCount.acquire(elements)) {
209-
throw new MaxOutstandingElementCountReachedException(
210-
outstandingElementCount.getPermitLimit());
219+
MaxOutstandingElementCountReachedException exception =
220+
new MaxOutstandingElementCountReachedException(
221+
outstandingElementCount.getPermitLimit());
222+
flowControlEventStats.recordFlowControlEvent(
223+
FlowControlEvent.createReserveDenied(exception));
224+
throw exception;
211225
}
212226
}
213227

@@ -218,9 +232,17 @@ public void reserve(long elements, long bytes) throws FlowControlException {
218232
if (outstandingElementCount != null) {
219233
outstandingElementCount.release(elements);
220234
}
221-
throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
235+
MaxOutstandingRequestBytesReachedException exception =
236+
new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
237+
flowControlEventStats.recordFlowControlEvent(
238+
FlowControlEvent.createReserveDenied(exception));
239+
throw exception;
222240
}
223241
}
242+
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
243+
if (elapsed >= RESERVE_FLOW_CONTROL_THRESHOLD_MS) {
244+
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.createReserveDelayed(elapsed));
245+
}
224246
}
225247

226248
public void release(long elements, long bytes) {
@@ -333,4 +355,9 @@ public Long getCurrentElementCountLimit() {
333355
public Long getCurrentRequestBytesLimit() {
334356
return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit();
335357
}
358+
359+
@InternalApi("For google-cloud-java client use only")
360+
public FlowControlEventStats getFlowControlEventStats() {
361+
return flowControlEventStats;
362+
}
336363
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertNotNull;
34+
import static org.junit.Assert.assertNull;
35+
import static org.junit.Assert.fail;
36+
37+
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
38+
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.concurrent.TimeUnit;
42+
import org.junit.Test;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.JUnit4;
45+
46+
@RunWith(JUnit4.class)
47+
public class FlowControlEventStatsTest {
48+
49+
@Test
50+
public void testCreateEvent() {
51+
long timestamp = 12345, throttledTimeMs = 5000;
52+
FlowControlEvent event = FlowControlEvent.createReserveDelayed(timestamp, throttledTimeMs);
53+
assertEquals(event.getTimestampMs(), event.getTimestampMs());
54+
assertEquals(throttledTimeMs / 1000, event.getThrottledTime(TimeUnit.SECONDS).longValue());
55+
assertNull(event.getException());
56+
57+
MaxOutstandingRequestBytesReachedException exception =
58+
new MaxOutstandingRequestBytesReachedException(100);
59+
event = FlowControlEvent.createReserveDenied(timestamp, exception);
60+
assertEquals(timestamp, event.getTimestampMs());
61+
assertNotNull(event.getException());
62+
assertEquals(exception, event.getException());
63+
assertNull(event.getThrottledTime(TimeUnit.MILLISECONDS));
64+
65+
try {
66+
event = FlowControlEvent.createReserveDenied(null);
67+
fail("FlowControlEvent did not throw exception");
68+
} catch (NullPointerException e) {
69+
// expected, ignore
70+
}
71+
}
72+
73+
@Test
74+
public void testGetLastEvent() throws InterruptedException {
75+
final FlowControlEventStats stats = new FlowControlEventStats();
76+
final long currentTime = System.currentTimeMillis();
77+
78+
List<Thread> threads = new ArrayList<>();
79+
for (int i = 1; i <= 10; i++) {
80+
final int timeElapsed = i;
81+
Thread t =
82+
new Thread() {
83+
@Override
84+
public void run() {
85+
stats.recordFlowControlEvent(
86+
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
87+
}
88+
};
89+
threads.add(t);
90+
t.start();
91+
}
92+
93+
for (Thread t : threads) {
94+
t.join(10);
95+
}
96+
97+
assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());
98+
assertEquals(
99+
10, stats.getLastFlowControlEvent().getThrottledTime(TimeUnit.MILLISECONDS).longValue());
100+
}
101+
}

0 commit comments

Comments
 (0)