Skip to content

Commit 448ec4f

Browse files
xds: XdsClient should unsubscribe on last resource (#11264)
Otherwise, the server will continue sending updates and if we re-subscribe to the last resource, the server won't re-send it. Also completely remove the per-type state, as it could only add confusion.
1 parent 96a788a commit 448ec4f

File tree

4 files changed

+57
-6
lines changed

4 files changed

+57
-6
lines changed

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,14 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
152152
startRpcStream();
153153
}
154154
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
155-
if (resources != null) {
156-
adsStream.sendDiscoveryRequest(resourceType, resources);
155+
if (resources == null) {
156+
resources = Collections.emptyList();
157+
}
158+
adsStream.sendDiscoveryRequest(resourceType, resources);
159+
if (resources.isEmpty()) {
160+
// The resource type no longer has subscribing resources; clean up references to it
161+
versions.remove(resourceType);
162+
adsStream.respNonces.remove(resourceType);
157163
}
158164
}
159165

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T>
281281
@SuppressWarnings("unchecked")
282282
public void run() {
283283
ResourceSubscriber<T> subscriber =
284-
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
284+
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
285285
subscriber.removeWatcher(watcher);
286286
if (!subscriber.isWatched()) {
287287
subscriber.cancelResourceWatch();

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import org.mockito.junit.MockitoJUnit;
134134
import org.mockito.junit.MockitoRule;
135135
import org.mockito.stubbing.Answer;
136+
import org.mockito.verification.VerificationMode;
136137

137138
/**
138139
* Tests for {@link XdsClientImpl}.
@@ -2757,6 +2758,37 @@ public void edsResourceNotFound() {
27572758
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
27582759
}
27592760

2761+
@Test
2762+
public void edsCleanupNonceAfterUnsubscription() {
2763+
Assume.assumeFalse(ignoreResourceDeletion());
2764+
2765+
// Suppose we have an EDS subscription A.1
2766+
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
2767+
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
2768+
assertThat(call).isNotNull();
2769+
call.verifyRequest(EDS, "A.1", "", "", NODE);
2770+
2771+
// EDS -> {A.1}, version 1
2772+
List<Message> dropOverloads = ImmutableList.of();
2773+
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
2774+
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
2775+
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
2776+
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
2777+
// {A.1} -> ACK, version 1
2778+
call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE);
2779+
verify(edsResourceWatcher, times(1)).onChanged(any());
2780+
2781+
// trigger an EDS resource unsubscription.
2782+
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
2783+
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
2784+
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);
2785+
2786+
// When re-subscribing, the version and nonce were properly forgotten, so the request is the
2787+
// same as the initial request
2788+
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
2789+
call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2));
2790+
}
2791+
27602792
@Test
27612793
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
27622794
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
@@ -3787,10 +3819,22 @@ protected abstract static class DiscoveryRpcCall {
37873819

37883820
protected void verifyRequest(
37893821
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
3790-
Node node) {
3822+
Node node, VerificationMode verificationMode) {
37913823
throw new UnsupportedOperationException();
37923824
}
37933825

3826+
protected void verifyRequest(
3827+
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
3828+
Node node) {
3829+
verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000));
3830+
}
3831+
3832+
protected void verifyRequest(
3833+
XdsResourceType<?> type, String resource, String versionInfo, String nonce,
3834+
Node node, VerificationMode verificationMode) {
3835+
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode);
3836+
}
3837+
37943838
protected void verifyRequest(
37953839
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
37963840
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import org.mockito.ArgumentMatcher;
119119
import org.mockito.InOrder;
120120
import org.mockito.Mockito;
121+
import org.mockito.verification.VerificationMode;
121122

122123
/**
123124
* Tests for {@link XdsClientImpl} with protocol version v3.
@@ -205,8 +206,8 @@ private DiscoveryRpcCallV3(StreamObserver<DiscoveryRequest> requestObserver,
205206
@Override
206207
protected void verifyRequest(
207208
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
208-
EnvoyProtoData.Node node) {
209-
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
209+
EnvoyProtoData.Node node, VerificationMode verificationMode) {
210+
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
210211
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
211212
}
212213

0 commit comments

Comments
 (0)