Skip to content

Commit b08db6a

Browse files
sschepensjakubdyszkiewicz
authored andcommitted
server: Refactor stream observer (envoyproxy#95)
Refactor discovery request stream observer Signed-off-by: Sebastian Schepens <[email protected]>
1 parent f127e26 commit b08db6a

File tree

5 files changed

+371
-203
lines changed

5 files changed

+371
-203
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
package io.envoyproxy.controlplane.cache;
22

33
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
4-
import java.util.concurrent.atomic.AtomicBoolean;
4+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55
import java.util.function.Consumer;
66

77
/**
88
* {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by
99
* the xDS server.
1010
*/
1111
public class Watch {
12-
12+
private static final AtomicIntegerFieldUpdater<Watch> isCancelledUpdater =
13+
AtomicIntegerFieldUpdater.newUpdater(Watch.class, "isCancelled");
1314
private final boolean ads;
14-
private final AtomicBoolean isCancelled = new AtomicBoolean();
1515
private final DiscoveryRequest request;
1616
private final Consumer<Response> responseConsumer;
17-
17+
private volatile int isCancelled = 0;
1818
private Runnable stop;
1919

2020
/**
2121
* Construct a watch.
2222
*
23-
* @param ads is this watch for an ADS request?
24-
* @param request the original request for the watch
23+
* @param ads is this watch for an ADS request?
24+
* @param request the original request for the watch
2525
* @param responseConsumer handler for outgoing response messages
2626
*/
2727
public Watch(boolean ads, DiscoveryRequest request, Consumer<Response> responseConsumer) {
@@ -42,7 +42,7 @@ public boolean ads() {
4242
* may be called multiple times, with each subsequent call being a no-op.
4343
*/
4444
public void cancel() {
45-
if (isCancelled.compareAndSet(false, true)) {
45+
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
4646
if (stop != null) {
4747
stop.run();
4848
}
@@ -53,7 +53,7 @@ public void cancel() {
5353
* Returns boolean indicating whether or not the watch has been cancelled.
5454
*/
5555
public boolean isCancelled() {
56-
return isCancelled.get();
56+
return isCancelledUpdater.get(this) == 1;
5757
}
5858

5959
/**
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
import static io.envoyproxy.controlplane.server.DiscoveryServer.ANY_TYPE_URL;
4+
5+
import io.envoyproxy.controlplane.cache.Resources;
6+
import io.envoyproxy.controlplane.cache.Watch;
7+
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
8+
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
9+
import io.grpc.Status;
10+
import io.grpc.stub.StreamObserver;
11+
import java.util.Collections;
12+
import java.util.Set;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.ConcurrentMap;
15+
import java.util.concurrent.Executor;
16+
import java.util.function.Supplier;
17+
18+
/**
19+
* {@code AdsDiscoveryRequestStreamObserver} is an implementation of {@link DiscoveryRequestStreamObserver} tailored for
20+
* ADS streams, which handle multiple watches for all TYPE_URLS.
21+
*/
22+
public class AdsDiscoveryRequestStreamObserver extends DiscoveryRequestStreamObserver {
23+
private final ConcurrentMap<String, Watch> watches;
24+
private final ConcurrentMap<String, DiscoveryResponse> latestResponse;
25+
private final ConcurrentMap<String, Set<String>> ackedResources;
26+
27+
AdsDiscoveryRequestStreamObserver(StreamObserver<DiscoveryResponse> responseObserver,
28+
long streamId,
29+
Executor executor,
30+
DiscoveryServer discoveryServer) {
31+
super(ANY_TYPE_URL, responseObserver, streamId, executor, discoveryServer);
32+
this.watches = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
33+
this.latestResponse = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
34+
this.ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
35+
}
36+
37+
@Override
38+
public void onNext(DiscoveryRequest request) {
39+
if (request.getTypeUrl().isEmpty()) {
40+
closeWithError(
41+
Status.UNKNOWN
42+
.withDescription(String.format("[%d] type URL is required for ADS", streamId))
43+
.asRuntimeException());
44+
45+
return;
46+
}
47+
48+
super.onNext(request);
49+
}
50+
51+
@Override
52+
void cancel() {
53+
watches.values().forEach(Watch::cancel);
54+
}
55+
56+
@Override
57+
boolean ads() {
58+
return true;
59+
}
60+
61+
@Override
62+
DiscoveryResponse latestResponse(String typeUrl) {
63+
return latestResponse.get(typeUrl);
64+
}
65+
66+
@Override
67+
void setLatestResponse(String typeUrl, DiscoveryResponse response) {
68+
latestResponse.put(typeUrl, response);
69+
}
70+
71+
@Override
72+
Set<String> ackedResources(String typeUrl) {
73+
return ackedResources.getOrDefault(typeUrl, Collections.emptySet());
74+
}
75+
76+
@Override
77+
void setAckedResources(String typeUrl, Set<String> resources) {
78+
ackedResources.put(typeUrl, resources);
79+
}
80+
81+
@Override
82+
void computeWatch(String typeUrl, Supplier<Watch> watchCreator) {
83+
watches.compute(typeUrl, (s, watch) -> {
84+
if (watch != null) {
85+
watch.cancel();
86+
}
87+
88+
return watchCreator.get();
89+
});
90+
}
91+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package io.envoyproxy.controlplane.server;
2+
3+
import static com.google.common.base.Strings.isNullOrEmpty;
4+
5+
import com.google.protobuf.Any;
6+
import io.envoyproxy.controlplane.cache.Resources;
7+
import io.envoyproxy.controlplane.cache.Response;
8+
import io.envoyproxy.controlplane.cache.Watch;
9+
import io.envoyproxy.controlplane.server.exception.RequestException;
10+
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
11+
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
12+
import io.grpc.Status;
13+
import io.grpc.StatusRuntimeException;
14+
import io.grpc.stub.StreamObserver;
15+
import java.util.Collection;
16+
import java.util.Set;
17+
import java.util.concurrent.Executor;
18+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
import java.util.function.Supplier;
20+
import java.util.stream.Collectors;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* {@code DiscoveryRequestStreamObserver} provides the base implementation for XDS stream handling.
26+
*/
27+
public abstract class DiscoveryRequestStreamObserver implements StreamObserver<DiscoveryRequest> {
28+
private static final AtomicLongFieldUpdater<DiscoveryRequestStreamObserver> streamNonceUpdater =
29+
AtomicLongFieldUpdater.newUpdater(DiscoveryRequestStreamObserver.class, "streamNonce");
30+
private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
31+
32+
final long streamId;
33+
private final String defaultTypeUrl;
34+
private final StreamObserver<DiscoveryResponse> responseObserver;
35+
private final Executor executor;
36+
private final DiscoveryServer discoverySever;
37+
private volatile long streamNonce;
38+
private volatile boolean isClosing;
39+
40+
DiscoveryRequestStreamObserver(String defaultTypeUrl,
41+
StreamObserver<DiscoveryResponse> responseObserver,
42+
long streamId,
43+
Executor executor,
44+
DiscoveryServer discoveryServer) {
45+
this.defaultTypeUrl = defaultTypeUrl;
46+
this.responseObserver = responseObserver;
47+
this.streamId = streamId;
48+
this.executor = executor;
49+
this.streamNonce = 0;
50+
this.discoverySever = discoveryServer;
51+
}
52+
53+
@Override
54+
public void onNext(DiscoveryRequest request) {
55+
String requestTypeUrl = request.getTypeUrl().isEmpty() ? defaultTypeUrl : request.getTypeUrl();
56+
String nonce = request.getResponseNonce();
57+
58+
if (LOGGER.isDebugEnabled()) {
59+
LOGGER.debug("[{}] request {}[{}] with nonce {} from version {}",
60+
streamId,
61+
requestTypeUrl,
62+
String.join(", ", request.getResourceNamesList()),
63+
nonce,
64+
request.getVersionInfo());
65+
}
66+
67+
try {
68+
discoverySever.callbacks.forEach(cb -> cb.onStreamRequest(streamId, request));
69+
} catch (RequestException e) {
70+
closeWithError(e);
71+
return;
72+
}
73+
74+
DiscoveryResponse latestResponse = latestResponse(requestTypeUrl);
75+
String resourceNonce = latestResponse == null ? null : latestResponse.getNonce();
76+
77+
if (isNullOrEmpty(resourceNonce) || resourceNonce.equals(nonce)) {
78+
if (!request.hasErrorDetail() && latestResponse != null) {
79+
Set<String> ackedResourcesForType = latestResponse.getResourcesList()
80+
.stream()
81+
.map(Resources::getResourceName)
82+
.collect(Collectors.toSet());
83+
setAckedResources(requestTypeUrl, ackedResourcesForType);
84+
}
85+
86+
computeWatch(requestTypeUrl, () -> discoverySever.configWatcher.createWatch(
87+
ads(),
88+
request,
89+
ackedResources(requestTypeUrl),
90+
r -> executor.execute(() -> send(r, requestTypeUrl))));
91+
}
92+
}
93+
94+
@Override
95+
public void onError(Throwable t) {
96+
if (!Status.fromThrowable(t).getCode().equals(Status.CANCELLED.getCode())) {
97+
LOGGER.error("[{}] stream closed with error", streamId, t);
98+
}
99+
100+
try {
101+
discoverySever.callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t));
102+
closeWithError(Status.fromThrowable(t).asException());
103+
} finally {
104+
cancel();
105+
}
106+
}
107+
108+
@Override
109+
public void onCompleted() {
110+
LOGGER.debug("[{}] stream closed", streamId);
111+
112+
try {
113+
discoverySever.callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl));
114+
synchronized (responseObserver) {
115+
if (!isClosing) {
116+
isClosing = true;
117+
responseObserver.onCompleted();
118+
}
119+
}
120+
} finally {
121+
cancel();
122+
}
123+
}
124+
125+
void onCancelled() {
126+
LOGGER.info("[{}] stream cancelled", streamId);
127+
cancel();
128+
}
129+
130+
void closeWithError(Throwable exception) {
131+
synchronized (responseObserver) {
132+
if (!isClosing) {
133+
isClosing = true;
134+
responseObserver.onError(exception);
135+
}
136+
}
137+
cancel();
138+
}
139+
140+
private void send(Response response, String typeUrl) {
141+
String nonce = Long.toString(streamNonceUpdater.getAndIncrement(this));
142+
143+
Collection<Any> resources = discoverySever.protoResourcesSerializer.serialize(response.resources());
144+
DiscoveryResponse discoveryResponse = DiscoveryResponse.newBuilder()
145+
.setVersionInfo(response.version())
146+
.addAllResources(resources)
147+
.setTypeUrl(typeUrl)
148+
.setNonce(nonce)
149+
.build();
150+
151+
LOGGER.debug("[{}] response {} with nonce {} version {}", streamId, typeUrl, nonce, response.version());
152+
153+
discoverySever.callbacks.forEach(cb -> cb.onStreamResponse(streamId, response.request(), discoveryResponse));
154+
155+
// Store the latest response *before* we send the response. This ensures that by the time the request
156+
// is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
157+
// which may see the incoming request arrive before the map is updated, failing the nonce check erroneously.
158+
setLatestResponse(typeUrl, discoveryResponse);
159+
synchronized (responseObserver) {
160+
if (!isClosing) {
161+
try {
162+
responseObserver.onNext(discoveryResponse);
163+
} catch (StatusRuntimeException e) {
164+
if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
165+
throw e;
166+
}
167+
}
168+
}
169+
}
170+
}
171+
172+
abstract void cancel();
173+
174+
abstract boolean ads();
175+
176+
abstract DiscoveryResponse latestResponse(String typeUrl);
177+
178+
abstract void setLatestResponse(String typeUrl, DiscoveryResponse response);
179+
180+
abstract Set<String> ackedResources(String typeUrl);
181+
182+
abstract void setAckedResources(String typeUrl, Set<String> resources);
183+
184+
abstract void computeWatch(String typeUrl, Supplier<Watch> watchCreator);
185+
}

0 commit comments

Comments
 (0)