Skip to content

xds:Update logic to match A57 #9745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
Expand Down Expand Up @@ -71,6 +73,7 @@ final class AbstractXdsClient {
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
private final Node bootstrapNode;
private final XdsClient.TimerLaunch timerLaunch;

// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
Expand Down Expand Up @@ -98,7 +101,8 @@ final class AbstractXdsClient {
timeService,
SynchronizationContext syncContext,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
Supplier<Stopwatch> stopwatchSupplier,
XdsClient.TimerLaunch timerLaunch) {
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
Expand All @@ -108,6 +112,7 @@ final class AbstractXdsClient {
this.timeService = checkNotNull(timeService, "timeService");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.timerLaunch = checkNotNull(timerLaunch, "timerLaunch");
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
logId = InternalLogId.allocate("xds-client", serverInfo.target());
logger = XdsLogger.withLogId(logId);
Expand Down Expand Up @@ -199,6 +204,22 @@ boolean isInBackoff() {
return rpcRetryTimer != null && rpcRetryTimer.isPending();
}

boolean isReady() {
return adsStream != null && adsStream.isReady();
}

/**
* Starts a timer for each requested resource that hasn't been responded to and
* has been waiting for the channel to get ready.
*/
void readyHandler() {
if (!isReady()) {
return;
}

timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
}

/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.
Expand Down Expand Up @@ -262,6 +283,8 @@ private abstract class AbstractAdsStream {

abstract void sendError(Exception error);

abstract boolean isReady();

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand Down Expand Up @@ -344,13 +367,26 @@ private void cleanUp() {
private final class AdsStreamV2 extends AbstractAdsStream {
private StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
void start() {
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc
.AggregatedDiscoveryServiceStub stub =
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
new ClientResponseObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest,
io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {

@Override
public void beforeStart(
ClientCallStreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryRequest> reqStream) {
reqStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler);
}

@Override
public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
syncContext.execute(new Runnable() {
Expand Down Expand Up @@ -427,11 +463,23 @@ void sendError(Exception error) {
private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<DiscoveryResponse> responseReader = new StreamObserver<DiscoveryResponse>() {
StreamObserver<DiscoveryResponse> responseReader =
new ClientResponseObserver<DiscoveryRequest,DiscoveryResponse>() {

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.setOnReadyHandler(AbstractXdsClient.this::readyHandler);
}

@Override
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {
Expand Down
8 changes: 8 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,12 @@ Collection<String> getSubscribedResources(ServerInfo serverInfo,

Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
}

interface TimerLaunch {
/**
* For all subscriber's for the specified server, if the resource hasn't yet been
* resolved then start a timer for it.
*/
void startSubscriberTimersIfNeeded(ServerInfo serverInfo);
}
}
46 changes: 42 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.TimerLaunch;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
Expand All @@ -65,9 +66,10 @@
import javax.annotation.Nullable;

/**
* XdsClient implementation for client side usages.
* XdsClient implementation.
*/
final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
final class XdsClientImpl extends XdsClient
implements XdsResponseHandler, ResourceStore, TimerLaunch {

// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
Expand Down Expand Up @@ -146,7 +148,8 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
timeService,
syncContext,
backoffPolicyProvider,
stopwatchSupplier);
stopwatchSupplier,
this);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
Expand Down Expand Up @@ -182,7 +185,9 @@ public void handleStreamClosed(Status error) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
subscriber.onError(error);
if (!subscriber.hasResult()) {
subscriber.onError(error);
}
}
}
}
Expand Down Expand Up @@ -380,6 +385,30 @@ public String toString() {
return logId.toString();
}

@Override
public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
if (isShutDown()) {
return;
}

syncContext.execute(new Runnable() {
@Override
public void run() {
if (isShutDown()) {
return;
}

for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
subscriber.restartTimer();
}
}
}
}
});
}

private void cleanUpResourceTimers() {
for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
Expand Down Expand Up @@ -577,6 +606,10 @@ void restartTimer() {
if (data != null || absent) { // resource already resolved
return;
}
if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer
return;
}

class ResourceNotFound implements Runnable {
@Override
public void run() {
Expand All @@ -594,6 +627,7 @@ public String toString() {

// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
metadata = ResourceMetadata.newResourceMetadataRequested();

respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
Expand Down Expand Up @@ -625,6 +659,10 @@ boolean isWatched() {
return !watchers.isEmpty();
}

boolean hasResult() {
return data != null || absent;
}

void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
Expand Down
Loading