Skip to content

Commit 4d38183

Browse files
authored
Send EDS response after CDS even if EDS hasn't change (envoyproxy#131)
Signed-off-by: Lukasz Dziedziak <[email protected]>
1 parent 2341e43 commit 4d38183

File tree

12 files changed

+359
-27
lines changed

12 files changed

+359
-27
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ public interface ConfigWatcher {
1616
/**
1717
* Returns a new configuration resource {@link Watch} for the given discovery request.
1818
*
19-
* @param ads is the watch for an ADS request?
20-
* @param request the discovery request (node, names, etc.) to use to generate the watch
19+
* @param ads is the watch for an ADS request?
20+
* @param request the discovery request (node, names, etc.) to use to generate the watch
2121
* @param knownResourceNames resources that are already known to the caller
22-
* @param responseConsumer the response handler, used to process outgoing response messages
22+
* @param responseConsumer the response handler, used to process outgoing response messages
23+
* @param hasClusterChanged Indicates if EDS should be sent immediately, even if version has not been changed.
24+
* Supported in ADS mode.
2325
*/
2426
Watch createWatch(
2527
boolean ads,
2628
DiscoveryRequest request,
2729
Set<String> knownResourceNames,
28-
Consumer<Response> responseConsumer);
30+
Consumer<Response> responseConsumer,
31+
boolean hasClusterChanged);
2932
}

cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.envoyproxy.controlplane.cache;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.google.common.collect.ImmutableSet;
45
import com.google.common.collect.Sets;
56
import com.google.protobuf.Message;
@@ -81,6 +82,14 @@ public boolean clearSnapshot(T group) {
8182
}
8283
}
8384

85+
public Watch createWatch(
86+
boolean ads,
87+
DiscoveryRequest request,
88+
Set<String> knownResourceNames,
89+
Consumer<Response> responseConsumer) {
90+
return createWatch(ads, request, knownResourceNames, responseConsumer, false);
91+
}
92+
8493
/**
8594
* {@inheritDoc}
8695
*/
@@ -89,7 +98,8 @@ public Watch createWatch(
8998
boolean ads,
9099
DiscoveryRequest request,
91100
Set<String> knownResourceNames,
92-
Consumer<Response> responseConsumer) {
101+
Consumer<Response> responseConsumer,
102+
boolean hasClusterChanged) {
93103

94104
T group = groups.hash(request.getNode());
95105
// even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it
@@ -121,6 +131,10 @@ public Watch createWatch(
121131

122132
return watch;
123133
}
134+
} else if (hasClusterChanged && request.getTypeUrl().equals(Resources.ENDPOINT_TYPE_URL)) {
135+
respond(watch, snapshot, group);
136+
137+
return watch;
124138
}
125139
}
126140

@@ -213,6 +227,25 @@ public synchronized void setSnapshot(T group, Snapshot snapshot) {
213227
}
214228

215229
// Responses should be in specific order and TYPE_URLS has a list of resources in the right order.
230+
respondWithSpecificOrder(group, snapshot, status);
231+
}
232+
233+
/**
234+
* {@inheritDoc}
235+
*/
236+
@Override
237+
public StatusInfo statusInfo(T group) {
238+
readLock.lock();
239+
240+
try {
241+
return statuses.get(group);
242+
} finally {
243+
readLock.unlock();
244+
}
245+
}
246+
247+
@VisibleForTesting
248+
protected void respondWithSpecificOrder(T group, Snapshot snapshot, CacheStatusInfo<T> status) {
216249
for (String typeUrl : Resources.TYPE_URLS) {
217250
status.watchesRemoveIf((id, watch) -> {
218251
if (!watch.request().getTypeUrl().equals(typeUrl)) {
@@ -240,20 +273,6 @@ public synchronized void setSnapshot(T group, Snapshot snapshot) {
240273
}
241274
}
242275

243-
/**
244-
* {@inheritDoc}
245-
*/
246-
@Override
247-
public StatusInfo statusInfo(T group) {
248-
readLock.lock();
249-
250-
try {
251-
return statuses.get(group);
252-
} finally {
253-
readLock.unlock();
254-
}
255-
}
256-
257276
private Response createResponse(DiscoveryRequest request, Map<String, ? extends Message> resources, String version) {
258277
Collection<? extends Message> filtered = request.getResourceNamesList().isEmpty()
259278
? resources.values()

cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,25 @@ public static Cluster createCluster(String clusterName, String address, int port
9696
* @param port port to use for the endpoint
9797
*/
9898
public static ClusterLoadAssignment createEndpoint(String clusterName, int port) {
99+
return createEndpoint(clusterName, LOCALHOST, port);
100+
}
101+
102+
/**
103+
* Returns a new test endpoint for the given cluster.
104+
*
105+
* @param clusterName name of the test cluster that is associated with this endpoint
106+
* @param address ip address to use for the endpoint
107+
* @param port port to use for the endpoint
108+
*/
109+
public static ClusterLoadAssignment createEndpoint(String clusterName, String address, int port) {
99110
return ClusterLoadAssignment.newBuilder()
100111
.setClusterName(clusterName)
101112
.addEndpoints(LocalityLbEndpoints.newBuilder()
102113
.addLbEndpoints(LbEndpoint.newBuilder()
103114
.setEndpoint(Endpoint.newBuilder()
104115
.setAddress(Address.newBuilder()
105116
.setSocketAddress(SocketAddress.newBuilder()
106-
.setAddress(LOCALHOST)
117+
.setAddress(address)
107118
.setPortValue(port)
108119
.setProtocol(Protocol.TCP))))))
109120
.build();

cache/src/test/java/io/envoyproxy/controlplane/cache/SimpleCacheTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.assertj.core.api.Assertions.assertThat;
55

66
import com.google.common.collect.ImmutableList;
7+
import com.google.common.collect.Sets;
78
import com.google.protobuf.Message;
89
import io.envoyproxy.envoy.api.v2.Cluster;
910
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
@@ -130,6 +131,33 @@ public void successfullyWatchAllResourceTypesWithSetBeforeWatch() {
130131
}
131132
}
132133

134+
@Test
135+
public void shouldSendEdsWhenClusterChangedButEdsVersionDidnt() {
136+
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());
137+
138+
cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1);
139+
140+
ResponseTracker responseTracker = new ResponseTracker();
141+
142+
Watch watch = cache.createWatch(
143+
ADS,
144+
DiscoveryRequest.newBuilder()
145+
.setNode(Node.getDefaultInstance())
146+
.setVersionInfo(VERSION1)
147+
.setTypeUrl(Resources.ENDPOINT_TYPE_URL)
148+
.addAllResourceNames(SNAPSHOT1.resources(Resources.ENDPOINT_TYPE_URL).keySet())
149+
.build(),
150+
Sets.newHashSet(""),
151+
responseTracker,
152+
true);
153+
154+
assertThat(watch.request().getTypeUrl()).isEqualTo(Resources.ENDPOINT_TYPE_URL);
155+
assertThat(watch.request().getResourceNamesList()).containsExactlyElementsOf(
156+
SNAPSHOT1.resources(Resources.ENDPOINT_TYPE_URL).keySet());
157+
158+
assertThatWatchReceivesSnapshot(new WatchAndTracker(watch, responseTracker), SNAPSHOT1);
159+
}
160+
133161
@Test
134162
public void successfullyWatchAllResourceTypesWithSetAfterWatch() {
135163
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());

server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ LatestDiscoveryResponse latestResponse(String typeUrl) {
6666
@Override
6767
void setLatestResponse(String typeUrl, LatestDiscoveryResponse response) {
6868
latestResponse.put(typeUrl, response);
69+
if (typeUrl.equals(Resources.CLUSTER_TYPE_URL)) {
70+
hasClusterChanged = true;
71+
} else if (typeUrl.equals(Resources.ENDPOINT_TYPE_URL)) {
72+
hasClusterChanged = false;
73+
}
6974
}
7075

7176
@Override

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public abstract class DiscoveryRequestStreamObserver implements StreamObserver<D
3030
private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
3131

3232
final long streamId;
33+
volatile boolean hasClusterChanged;
3334
private final String defaultTypeUrl;
3435
private final StreamObserver<DiscoveryResponse> responseObserver;
3536
private final Executor executor;
@@ -48,6 +49,7 @@ public abstract class DiscoveryRequestStreamObserver implements StreamObserver<D
4849
this.executor = executor;
4950
this.streamNonce = 0;
5051
this.discoverySever = discoveryServer;
52+
this.hasClusterChanged = false;
5153
}
5254

5355
@Override
@@ -83,7 +85,9 @@ public void onNext(DiscoveryRequest request) {
8385
ads(),
8486
request,
8587
ackedResources(requestTypeUrl),
86-
r -> executor.execute(() -> send(r, requestTypeUrl))));
88+
r -> executor.execute(() -> send(r, requestTypeUrl)),
89+
hasClusterChanged
90+
));
8791
}
8892
}
8993

server/src/test/java/io/envoyproxy/controlplane/server/DiscoveryServerAdsIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,15 @@ public void onStreamResponse(long streamId, DiscoveryRequest request, DiscoveryR
5252

5353
cache.setSnapshot(
5454
GROUP,
55-
createSnapshot(true, "upstream", "upstream", EchoContainer.PORT, "listener0", LISTENER_PORT, "route0", "1"));
55+
createSnapshot(true,
56+
"upstream",
57+
UPSTREAM.ipAddress(),
58+
EchoContainer.PORT,
59+
"listener0",
60+
LISTENER_PORT,
61+
"route0",
62+
"1")
63+
);
5664

5765
DiscoveryServer server = new DiscoveryServer(callbacks, cache);
5866

0 commit comments

Comments
 (0)