Skip to content

Add GCS telemtry with ThreadLocal #125452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Apr 4, 2025
Merged

Conversation

mhl-b
Copy link
Contributor

@mhl-b mhl-b commented Mar 22, 2025

A simplified, yet larger code size, implementation of GCP telemetry. All previous efforts to intercept HTTP calls and do meaningful correlation between operations and requests were fruitless. This approach stores stats in the ThreadLocal so HTTP response interceptor can populate request metrics for parent operation.

It's achieved with new abstraction on top of GCS interface - MeteredStorage. This class provides a family of "MeteredXXX" wrappers for GCS objects, such as Storage API itself, MeteredObjectsGetRequest, MeteredWriteChannel, MeteredReadChannel, MeteredBlobPage.

Interesting bits for review RepositoryStatsCollector, OperationStats, and MeteredStorage.

MeteredStorage will execute GCS implementation and attach metrics to the ThreadLocal. RepositoryStatsCollector will get stats from the ThreadLocal and populate metrics and then aggregate collected metrics and expose them to REST stats and APM.

Every SDK call is wrapped into one of those helpers to populate ThreadLocal, collect results, and cleanup. See RepositoryStatsCollector.

    public <T> T collectIOSupplier(OperationPurpose purpose, StorageOperation operation, IOSupplier<T> blobFn) throws IOException {
        var t = timer.absoluteTimeInMillis();
        var stats = initAndGetThreadLocal(purpose, operation);
        try {
            return blobFn.get();
        } finally {
            stats.totalDuration += timer.absoluteTimeInMillis() - t;
            collect(stats);
            clearThreadLocal();
        }
    }

Response interceptor simplified to

    public static final HttpResponseInterceptor METERING_INTERCEPTOR = response -> {
        var stats = getThreadLocal();
        var code = response.getStatusCode();
        stats.requestAttempts += 1;
        stats.isLastRequestSucceed = true;
        if (((code >= 200 && code < 300) || code == 308 || code == 404) == false) {
            stats.requestError += 1;
            stats.isLastRequestSucceed = false;
            switch (code) {
                case 416 -> stats.requestRangeError += 1;
                case 429 -> stats.requestThrottle += 1;
            }
        }
    };

PS, this approach does not require creation of clients for every OpearationPurpose, already removed.

@mhl-b mhl-b added >enhancement :Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs Team:Distributed Coordination Meta label for Distributed Coordination team labels Mar 22, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've created a changelog YAML for you.

@mhl-b mhl-b force-pushed the gcp-metrics-threadlocal branch from 6983065 to b5da931 Compare March 22, 2025 07:30
@mhl-b mhl-b marked this pull request as ready for review March 22, 2025 07:30
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've created a changelog YAML for you.

@mhl-b mhl-b requested a review from nicktindall March 23, 2025 05:02
static OperationStats initAndGet(OperationPurpose purpose, StorageOperation operation) {
var stats = new OperationStats(purpose, operation);
OPERATION_STATS.set(stats);
return stats;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert here that we're not over-writing an existing OperationStats? it seems like if things are working properly we should never do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And below in set perhaps?

public Map<String, BlobStoreActionStats> operationsStats() {
return restMetering.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().key, e -> {
var ops = e.getValue().sum();
return new BlobStoreActionStats(ops, ops);
Copy link
Contributor

@nicktindall nicktindall Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ops and requests (ops/ops) was just a stop-gap until we had the ability to count requests

Copy link
Contributor Author

@mhl-b mhl-b Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking BlobStoreActionStats is not practical. REST metering tracks operation counts only and APM structure does not fit in it, the error metrics in particular. I think we can remove BlobStoreActionStats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example record BillableRequests(long count){} and Map<Operation,BillableRequests>. That would be more accurate representation for REST API metering endpoint.

* Continue collecting metrics with given OperationStats. Useful for readers and writers.
*/
public void continueAndCollect(OperationStats stats, CheckedRunnable<IOException> runnable) throws IOException {
OperationStats.set(stats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be more assertive about the current state of OperationStats. In this case it's probably safe to clear it after each action right? then we could assert that it was empty each time we went to start a new action, to make it fail noisily if they started being interleaved somehow?

* Create the map of attributes we expect to see on repository metrics
*/
public static Map<String, Object> createAttributesMap(String repoType, String repoName, OperationPurpose purpose, String operation) {
return Map.of("repo_type", repoType, "repo_name", repoName, "operation", operation, "purpose", purpose.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the additional layer of indirection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, will inline

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach much better, a few comments

@mhl-b
Copy link
Contributor Author

mhl-b commented Apr 3, 2025

@nicktindall , it's ready for another review. I havn't done APM testing yet, PR is already large. Will do a follow up PR.

@mhl-b mhl-b requested a review from nicktindall April 3, 2025 07:14
Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, I have some queries around the counting logic and other minor stuff

@@ -34,14 +35,15 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin

@SuppressWarnings("this-escape")
public GoogleCloudStoragePlugin(final Settings settings) {
this.storageService = createStorageService(settings);
var isServerless = settings.getAsBoolean(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a helper for this org.elasticsearch.cluster.node.DiscoveryNode#isStateless, there is some sentiment that its use shouldn't be expanded but it just does what you're doing here.

I think we're supposed to prefer plugins/SPI to having explicit flag checks like this (maybe something we could consider later)

trackRequest(StorageOperation.LIST.key());
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=resumable*", request)) {
trackRequest(StorageOperation.INSERT.key());
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count all requests now, not operations

@mhl-b mhl-b requested a review from nicktindall April 4, 2025 05:20
@Override
public void testRequestStats() throws Exception {
super.testRequestStats();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helper to run from ide


/**
* Continue collecting metrics with given OperationStats. Useful for readers and writers.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste error on the comment here? or just out of date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of date

}

private void collect(OperationStats stats) {
if (stats.reqAtt == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda think we shouldn't use abbreviations here, requestsAttempted reads much nicer than Att especially because att can mean so many things

Copy link
Contributor

@nicktindall nicktindall Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we have attr floating around in the same block of code. It's a bit confusing.

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with some minor comments 🏆

@mhl-b
Copy link
Contributor Author

mhl-b commented Apr 4, 2025

A followup issue on removing serverless flag from BlobStore#stats() #126262

@mhl-b mhl-b merged commit 70654a3 into elastic:main Apr 4, 2025
17 checks passed
andreidan pushed a commit to andreidan/elasticsearch that referenced this pull request Apr 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants