Skip to content

xds: Add counter and gauge metrics #11661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move interfaces for reporting gauge values to XdsClient and update un…
…it tests
  • Loading branch information
DNVindhya committed Nov 12, 2024
commit 5881cb6676a8840bcd5254a602253c1bb9608a4a
44 changes: 19 additions & 25 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceCallback;
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
import io.grpc.xds.client.XdsClientMetricReporter;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -125,52 +127,44 @@
}

void reportCallbackMetrics(BatchRecorder recorder) {
CallbackMetricReporter callbackMetricReporter = createCallbackMetricReporter(recorder);
MetricReporterCallback callback = new MetricReporterCallback(recorder);
try {
SettableFuture<Void> ret = this.xdsClient.reportResourceCounts(
callbackMetricReporter);
SettableFuture<Void> reportResourceCountsCompleted = this.xdsClient.reportResourceCounts(
callback);
SettableFuture<Void> reportServerConnectionsCompleted =
this.xdsClient.reportServerConnections(callback);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = ret.get(5, TimeUnit.SECONDS);
SettableFuture<Void> ret1 = this.xdsClient.reportServerConnections(callbackMetricReporter);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused1 = ret1.get(5, TimeUnit.SECONDS);
Void unused1 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
Void unused2 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // re-set the current thread's interruption state

Check warning on line 141 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java#L141

Added line #L141 was not covered by tests
}
logger.log(Level.WARNING, "Failed to report gauge metrics", e);
}
}

/**
* Allows injecting a custom {@link CallbackMetricReporter} for testing purposes.
*/
@VisibleForTesting
CallbackMetricReporter createCallbackMetricReporter(BatchRecorder recorder) {
return new CallbackMetricReporterImpl(recorder);
}

@VisibleForTesting
static final class CallbackMetricReporterImpl implements
XdsClientMetricReporter.CallbackMetricReporter {
static final class MetricReporterCallback implements ResourceCallback,
ServerConnectionCallback {
private final BatchRecorder recorder;

CallbackMetricReporterImpl(BatchRecorder recorder) {
MetricReporterCallback(BatchRecorder recorder) {
this.recorder = recorder;
}

@Override
public void reportServerConnections(int isConnected, String target, String xdsServer) {
recorder.recordLongGauge(CONNECTED_GAUGE, isConnected, Arrays.asList(target, xdsServer),
Collections.emptyList());
}

// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
@Override
public void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
public void reportResourceCountGauge(long resourceCount, String cacheState, String resourceType,
String target) {
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
Arrays.asList(target, cacheState, resourceType), Collections.emptyList());
}

@Override
public void reportServerConnectionGauge(int isConnected, String target, String xdsServer) {
recorder.recordLongGauge(CONNECTED_GAUGE, isConnected, Arrays.asList(target, xdsServer),
Collections.emptyList());
}
}
}
23 changes: 16 additions & 7 deletions xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.grpc.ExperimentalApi;
import io.grpc.Status;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -380,8 +379,20 @@
throw new UnsupportedOperationException();
}

/** Callback used to report gauge metric value for resources. */
public interface ResourceCallback {
// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
void reportResourceCountGauge(long resourceCount, String cacheState, String resourceType,
String target);
}

/** Callback used to report a gauge metric value for server connections. */
public interface ServerConnectionCallback {
void reportServerConnectionGauge(int isConnected, String target, String xdsServer);
}

/**
* Reports the number of resources in each cache state through {@link CallbackMetricReporter}.
* Reports the number of resources in each cache state.
*
* <p>Cache state is determined by two factors:
* <ul>
Expand All @@ -390,17 +401,15 @@
* resource.
* </ul>
*/
public SettableFuture<Void> reportResourceCounts(CallbackMetricReporter callbackMetricReporter) {
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
throw new UnsupportedOperationException();

Check warning on line 405 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L405

Added line #L405 was not covered by tests
}

/**
* Reports whether xDS client has a working ADS stream to xDS server. Reporting is done through
* {@link CallbackMetricReporter}.
* Reports whether xDS client has a working ADS stream to xDS server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this working or non-errored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used working as it was used in metric description of grpc.xds_client.connected.
Also, I think it is more nuanced than non-errored, because if there is an error on ADS stream and / or close before receiving a response, it is an error and value will remain false until ADS stream receives a response to say it has a working stream to communicate with server.
Added link to A78 which has definition for working stream. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't the dictionary definition of "working", but you're right that it is the proposal's definition. I'd be happier if you put quotes around working, but it's up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added quotes around working.

*/
public SettableFuture<Void> reportServerConnections(
CallbackMetricReporter callbackMetricReporter) {
public SettableFuture<Void> reportServerConnections(ServerConnectionCallback callback) {
throw new UnsupportedOperationException();

Check warning on line 412 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L412

Added line #L412 was not covered by tests
}

static final class ProcessingTracker {
Expand Down
30 changes: 9 additions & 21 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceStore;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.Collection;
Expand Down Expand Up @@ -536,12 +534,11 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
}

@Override
public SettableFuture<Void> reportServerConnections(
CallbackMetricReporter callbackMetricReporter) {
public SettableFuture<Void> reportServerConnections(ServerConnectionCallback callback) {
SettableFuture<Void> future = SettableFuture.create();
syncContext.execute(() -> {
serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->
callbackMetricReporter.reportServerConnections(
callback.reportServerConnectionGauge(
controlPlaneClient.hasWorkingAdsStream() ? 1 : 0,
target,
serverInfo.target()));
Expand All @@ -551,12 +548,12 @@ public SettableFuture<Void> reportServerConnections(
}

@Override
public SettableFuture<Void> reportResourceCounts(CallbackMetricReporter callbackMetricReporter) {
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
SettableFuture<Void> future = SettableFuture.create();
syncContext.execute(() -> {
Map<XdsResourceType<?>, Map<String, Long>> resourceCountsByType =
getResourceCountsByType();
reportResourceCountsToCallback(callbackMetricReporter, resourceCountsByType);
reportResourceCountsToCallback(callback, resourceCountsByType);
});
future.set(null);
return future;
Expand All @@ -568,9 +565,7 @@ private String cacheStateFromResourceStatus(ResourceMetadata metadata, boolean i
? status + "_but_cached" : status;
}

/**
* Calculates number of resources by ResourceType and ResourceSubscriber.metadata.status.
*/
/** Calculates number of resources by ResourceType and ResourceSubscriber.metadata.status. */
Map<XdsResourceType<?>, Map<String, Long>> getResourceCountsByType() {
Map<XdsResourceType<?>, Map<String, Long>> resourceCountsByType = new HashMap<>();
for (XdsResourceType<? extends ResourceUpdate> resourceType : resourceSubscribers.keySet()) {
Expand All @@ -586,28 +581,21 @@ Map<XdsResourceType<?>, Map<String, Long>> getResourceCountsByType() {
return resourceCountsByType;
}

/**
* Reports resource counts using the provided callbackMetricReporter.
*/
void reportResourceCountsToCallback(CallbackMetricReporter callbackMetricReporter,
/** Reports resource counts using the provided ResourceCallback. */
void reportResourceCountsToCallback(ResourceCallback callback,
Map<XdsResourceType<?>, Map<String, Long>> resourceCountsByType) {
for (Map.Entry<XdsResourceType<?>, Map<String, Long>> entry :
resourceCountsByType.entrySet()) {
XdsResourceType<?> resourceType = entry.getKey();
Map<String, Long> resourceCountsByState = entry.getValue();
// TODO(@dnvindhya): include the "authority" label once authority is available here.
resourceCountsByState.forEach((cacheState, count) ->
callbackMetricReporter.reportResourceCounts(
count,
cacheState, resourceType.typeUrl(), target
));
callback.reportResourceCountGauge(count, cacheState, resourceType.typeUrl(), target));
}
}


/**
* Tracks a single subscribed resource.
*/
/** Tracks a single subscribed resource. */
private final class ResourceSubscriber<T extends ResourceUpdate> {
@Nullable private final ServerInfo serverInfo;
@Nullable private final ControlPlaneClient controlPlaneClient;
Expand Down
16 changes: 0 additions & 16 deletions xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String target, String xdsServer, String resourceType) {
}

Check warning on line 38 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L38

Added line #L38 was not covered by tests

/**
* Reports number of xDS servers going from healthy to unhealthy.
Expand All @@ -45,22 +45,6 @@
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
}

Check warning on line 48 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L48

Added line #L48 was not covered by tests

/**
* Interface for reporting metrics through callback.
*
*/
interface CallbackMetricReporter {


// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
String target) {
}


default void reportServerConnections(int isConnected, String target, String xdsServer) {
}
}
}
Loading
Loading