Skip to content

xds: Convert CdsLb to XdsDepManager #12140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
463 changes: 129 additions & 334 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
@Internal
public class CdsLoadBalancerProvider extends LoadBalancerProvider {

private static final String CLUSTER_KEY = "cluster";

@Override
public boolean isAvailable() {
return true;
Expand Down Expand Up @@ -70,9 +68,12 @@
*/
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
try {
String cluster =
JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY);
return ConfigOrError.fromConfig(new CdsConfig(cluster));
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic");
if (isDynamic == null) {
isDynamic = Boolean.FALSE;
}
return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic));
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.UNAVAILABLE.withCause(e).withDescription(
Expand All @@ -89,15 +90,28 @@
* Name of cluster to query CDS for.
*/
final String name;
/**
* Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of
* it without an explicit cluster subscription.
*/
final boolean isDynamic;

CdsConfig(String name) {
this(name, false);
}

CdsConfig(String name, boolean isDynamic) {
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
this.name = name;
this.isDynamic = isDynamic;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("name", name).toString();
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("isDynamic", isDynamic)
.toString();

Check warning on line 114 in xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java#L111-L114

Added lines #L111 - L114 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(
}
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
"Invalid 'mingRingSize'/'maxRingSize'"));
"Invalid 'minRingSize'/'maxRingSize'"));
}
return ConfigOrError.fromConfig(
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
Expand Down
8 changes: 7 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ static CdsUpdate processCluster(Cluster cluster,
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
lbConfig.getRawConfigValue());
if (configOrError.getError() != null) {
throw new ResourceInvalidException(structOrError.getErrorDetail());
throw new ResourceInvalidException(
"Failed to parse lb config for cluster '" + cluster.getName() + "': "
+ configOrError.getError());
}

updateBuilder.lbPolicyConfig(lbPolicyConfig);
Expand Down Expand Up @@ -209,6 +211,10 @@ private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cl
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
}
if (clusterConfig.getClustersList().isEmpty()) {
return StructOrError.fromError("Cluster " + clusterName
+ ": aggregate ClusterConfig.clusters must not be empty");
}
return StructOrError.fromStruct(CdsUpdate.forAggregate(
clusterName, clusterConfig.getClustersList()));
}
Expand Down
8 changes: 7 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ XdsConfig build() {
}

public interface XdsClusterSubscriptionRegistry {
Closeable subscribeToCluster(String clusterName);
Subscription subscribeToCluster(String clusterName);
}

public interface Subscription extends Closeable {
/** Release resources without throwing exceptions. */
@Override
void close();
}
}
47 changes: 31 additions & 16 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.client.XdsClient.ResourceUpdate;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,8 +35,6 @@
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -56,39 +55,43 @@
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
private final String listenerName;
private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher;
private final SynchronizationContext syncContext;
private final String dataPlaneAuthority;
private XdsConfigWatcher xdsConfigWatcher;

private StatusOr<XdsConfig> lastUpdate = null;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
private final Set<ClusterSubscription> subscriptions = new HashSet<>();

XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
XdsDependencyManager(XdsClient xdsClient,
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName, NameResolver.Args nameResolverArgs,
ScheduledExecutorService scheduler) {
this.listenerName = checkNotNull(listenerName, "listenerName");
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
checkNotNull(nameResolverArgs, "nameResolverArgs");
checkNotNull(scheduler, "scheduler");

// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}

public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource " + resourceName;
}

public void start(XdsConfigWatcher xdsConfigWatcher) {
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}

@Override
public Closeable subscribeToCluster(String clusterName) {
public XdsConfig.Subscription subscribeToCluster(String clusterName) {
checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
checkNotNull(clusterName, "clusterName");
ClusterSubscription subscription = new ClusterSubscription(clusterName);

Expand Down Expand Up @@ -291,10 +294,17 @@ private static void addConfigForCluster(
addConfigForCluster(clusters, childCluster, ancestors, tracer);
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
if (!config.hasValue()) {
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
"Unable to get leaves for " + clusterName + ": "
+ config.getStatus().getDescription())));
return;
// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
// watchers reports a transient ADS stream error, the policy should report that it is in
// TRANSIENT_FAILURE if it has never passed a config to its child.
//
// But there's currently disagreement about whether that is actually what we want, and
// that was not originally implemented in gRPC Java. So we're keeping Java's old
// behavior for now and only failing the "leaves" (which is a bit arbitrary for a
// cycle).
leafNames.add(childCluster);
continue;
}
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
if (children instanceof AggregateConfig) {
Expand Down Expand Up @@ -325,6 +335,11 @@ private static void addConfigForCluster(
default:
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
}
if (clusters.containsKey(clusterName)) {
// If a cycle is detected, we'll have detected it while recursing, so now there will be a key
// present. We don't want to overwrite it with a non-error value.
return;
}
clusters.put(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
}
Expand Down Expand Up @@ -406,7 +421,7 @@ public interface XdsConfigWatcher {
void onUpdate(StatusOr<XdsConfig> config);
}

private final class ClusterSubscription implements Closeable {
private final class ClusterSubscription implements XdsConfig.Subscription {
private final String clusterName;
boolean closed; // Accessed from syncContext

Expand All @@ -419,7 +434,7 @@ String getClusterName() {
}

@Override
public void close() throws IOException {
public void close() {
releaseSubscription(this);
}
}
Expand Down
9 changes: 7 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public void start(Listener2 listener) {
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
callCounterProvider = SharedCallCounterMap.getInstance();

resolveState = new ResolveState(ldsResourceName); // auto starts
resolveState = new ResolveState(ldsResourceName);
resolveState.start();
}

private static String expandPercentS(String template, String replacement) {
Expand Down Expand Up @@ -653,10 +654,14 @@ class ResolveState implements XdsDependencyManager.XdsConfigWatcher {
private ResolveState(String ldsResourceName) {
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
xdsDependencyManager =
new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName,
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
nameResolverArgs, scheduler);
}

void start() {
xdsDependencyManager.start(this);
}

private void shutdown() {
if (stopped) {
return;
Expand Down
Loading
Loading