Skip to content

POC: Add support for per project repo client #126584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService(boolean isServerless) {
protected GoogleCloudStorageService createStorageService(
boolean isServerless,
GcsPerProjectClientManager gcsPerProjectClientManager
) {
return new GoogleCloudStorageService() {
@Override
StorageOptions createStorageOptions(
Expand Down Expand Up @@ -279,7 +282,7 @@ public Map<String, Repository.Factory> getRepositories(
metadata -> new GoogleCloudStorageRepository(
metadata,
registry,
this.storageService,
this.storageService.get(),
clusterService,
bigArrays,
recoverySettings,
Expand All @@ -291,7 +294,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
metadata.settings().get("bucket"),
"test",
metadata.name(),
storageService,
storageService.get(),
bigArrays,
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.gcs;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.settings.ProjectSecrets;
import org.elasticsearch.common.settings.Settings;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

public class GcsPerProjectClientManager implements ClusterStateListener {

private final Settings settings;
private final BiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage> clientBuilder;
private final Map<ProjectId, ClientsHolder> perProjectClientsCache;

public GcsPerProjectClientManager(
Settings settings,
BiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage> clientBuilder
) {
this.settings = settings;
this.clientBuilder = clientBuilder;
this.perProjectClientsCache = new ConcurrentHashMap<>();
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();

final var updatedPerProjectClients = new HashMap<ProjectId, ClientsHolder>();
for (var project : currentProjects.values()) {
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
if (projectSecrets == null) {
// This can only happen when a node restarts, it will be processed again when file settings are loaded
continue;
}
final Settings currentSettings = Settings.builder()
// merge with static settings such as max retries etc, exclude secure settings
// TODO: We may need to update this if per-project settings decide to support hierarchical overrides
.put(settings, false)
.setSecureSettings(projectSecrets.getSettings())
.build();
final Map<String, GoogleCloudStorageClientSettings> clientSettings = GoogleCloudStorageClientSettings.load(currentSettings);

// TODO: clientSettings should not be empty, i.e. there should be at least one client configured
// Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok.

// TODO: Building and comparing the whole GoogleCloudStorageClientSettings may be insufficient, we could just compare the
// relevant secrets
if (newOrUpdated(project.id(), clientSettings)) {
updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings));
}
}

// Updated projects
perProjectClientsCache.putAll(updatedPerProjectClients);

// removed projects
for (var projectId : perProjectClientsCache.keySet()) {
if (currentProjects.containsKey(projectId) == false) {
perProjectClientsCache.remove(projectId);
}
}
}

public MeteredStorage client(
ProjectId projectId,
String clientName,
String repositoryName,
GcsRepositoryStatsCollector statsCollector
) {
final var clientsHolder = perProjectClientsCache.get(projectId);
if (clientsHolder == null) {
throw new IllegalArgumentException("No project found for [" + projectId + "]");
}
return clientsHolder.client(clientName, repositoryName, statsCollector);
}

public void closeRepositoryClients(ProjectId projectId, String repositoryName) {
final var clientsHolder = perProjectClientsCache.get(projectId);
if (clientsHolder != null) {
clientsHolder.closeRepositoryClients(repositoryName);
}
}

private boolean newOrUpdated(ProjectId projectId, Map<String, GoogleCloudStorageClientSettings> currentClientSettings) {
final ClientsHolder old = perProjectClientsCache.get(projectId);
if (old == null) {
return true;
}
return currentClientSettings.equals(old.clientSettings()) == false;
}

private final class ClientsHolder {
// clientName -> client settings
private final Map<String, GoogleCloudStorageClientSettings> clientSettings;
// repositoryName -> client
private final Map<String, MeteredStorage> clientCache = new ConcurrentHashMap<>();

ClientsHolder(Map<String, GoogleCloudStorageClientSettings> clientSettings) {
this.clientSettings = clientSettings;
}

Map<String, GoogleCloudStorageClientSettings> clientSettings() {
return clientSettings;
}

MeteredStorage client(String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector) {
return clientCache.computeIfAbsent(repositoryName, ignored -> {
final var settings = clientSettings.get(clientName);
if (settings == null) {
throw new IllegalArgumentException("No client settings found for [" + clientName + "]");
}
return clientBuilder.apply(settings, statsCollector);
});
}

void closeRepositoryClients(String repositoryName) {
clientCache.remove(repositoryName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

package org.elasticsearch.repositories.gcs;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -23,27 +25,48 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {

// package-private for tests
final GoogleCloudStorageService storageService;
final SetOnce<GoogleCloudStorageService> storageService = new SetOnce<>();

@SuppressWarnings("this-escape")
public GoogleCloudStoragePlugin(final Settings settings) {
public GoogleCloudStoragePlugin(final Settings settings) {}

@Override
public Collection<?> createComponents(PluginServices services) {
final Settings settings = services.clusterService().getSettings();
GcsPerProjectClientManager gcsPerProjectClientManager = null;
if (services.projectResolver().supportsMultipleProjects()) {
gcsPerProjectClientManager = new GcsPerProjectClientManager(settings, (gcsClientSettings, statsCollector) -> {
try {
return storageService.get().createClient(gcsClientSettings, statsCollector);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
services.clusterService().addListener(gcsPerProjectClientManager);
}
var isServerless = DiscoveryNode.isStateless(settings);
this.storageService = createStorageService(isServerless);
this.storageService.set(createStorageService(isServerless, gcsPerProjectClientManager));
// eagerly load client settings so that secure settings are readable (not closed)
reload(settings);
return List.of();
}

// overridable for tests
protected GoogleCloudStorageService createStorageService(boolean isServerless) {
return new GoogleCloudStorageService(isServerless);
protected GoogleCloudStorageService createStorageService(
boolean isServerless,
@Nullable GcsPerProjectClientManager gcsPerProjectClientManager
) {
return new GoogleCloudStorageService(isServerless, gcsPerProjectClientManager);
}

@Override
Expand All @@ -60,7 +83,7 @@ public Map<String, Repository.Factory> getRepositories(
metadata -> new GoogleCloudStorageRepository(
metadata,
namedXContentRegistry,
this.storageService,
this.storageService.get(),
clusterService,
bigArrays,
recoverySettings,
Expand Down Expand Up @@ -93,6 +116,6 @@ public void reload(Settings settings) {
// `GoogleCloudStorageClientSettings` instance) instead of the `Settings`
// instance.
final Map<String, GoogleCloudStorageClientSettings> clientsSettings = GoogleCloudStorageClientSettings.load(settings);
this.storageService.refreshAndClearCache(clientsSettings);
this.storageService.get().refreshAndClearCache(clientsSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -56,13 +57,16 @@ public class GoogleCloudStorageService {
private volatile Map<String, GoogleCloudStorageClientSettings> clientSettings = emptyMap();

private final boolean isServerless;
@Nullable
private GcsPerProjectClientManager gcsPerProjectClientManager;

public GoogleCloudStorageService() {
this.isServerless = false;
this(false, null);
}

public GoogleCloudStorageService(boolean isServerless) {
public GoogleCloudStorageService(boolean isServerless, @Nullable GcsPerProjectClientManager gcsPerProjectClientManager) {
this.isServerless = isServerless;
this.gcsPerProjectClientManager = gcsPerProjectClientManager;
}

public boolean isServerless() {
Expand Down Expand Up @@ -132,21 +136,54 @@ public MeteredStorage client(final String clientName, final String repositoryNam
}
}

public MeteredStorage client(
@Nullable final ProjectId projectId,
final String clientName,
final String repositoryName,
final GcsRepositoryStatsCollector statsCollector
) throws IOException {
if (gcsPerProjectClientManager == null) {
// single default project mode
assert ProjectId.DEFAULT.equals(projectId) : projectId;
return client(clientName, repositoryName, statsCollector);
} else if (projectId == null) {
// MP mode for cluster level client
return client(clientName, repositoryName, statsCollector);
} else {
// MP mode for per-project client
return gcsPerProjectClientManager.client(projectId, clientName, repositoryName, statsCollector);
}
}

synchronized void closeRepositoryClients(String repositoryName) {
clientCache = clientCache.entrySet()
.stream()
.filter(entry -> entry.getKey().equals(repositoryName) == false)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

void closeRepositoryClients(@Nullable ProjectId projectId, String repositoryName) {
if (gcsPerProjectClientManager == null) {
// single default project mode
assert ProjectId.DEFAULT.equals(projectId) : projectId;
closeRepositoryClients(repositoryName);
} else if (projectId == null) {
// MP mode for cluster level client
closeRepositoryClients(repositoryName);
} else {
// MP mode for per-project client
gcsPerProjectClientManager.closeRepositoryClients(projectId, repositoryName);
}
}

/**
* Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
*
* @param gcsClientSettings client settings to use, including secure settings
* @return a new client storage instance that can be used to manage objects
* (blobs)
*/
private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector)
MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector)
throws IOException {
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
final NetHttpTransport.Builder builder = new NetHttpTransport.Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public void testReinitClientSettings() throws Exception {
secureSettings2.setFile("gcs.client.gcs3.credentials_file", serviceAccountFileContent("project_gcs23"));
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) {
final GoogleCloudStorageService storageService = plugin.storageService;
plugin.storageService.set(plugin.createStorageService(randomBoolean(), null));
plugin.reload(settings1);
final GoogleCloudStorageService storageService = plugin.storageService.get();
var statsCollector = new GcsRepositoryStatsCollector();
final var client11 = storageService.client("gcs1", "repo1", statsCollector);
assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11"));
Expand Down Expand Up @@ -151,7 +153,9 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception {
secureSettings1.setFile("gcs.client.gcs1.credentials_file", serviceAccountFileContent("test_project"));
final Settings settings = Settings.builder().setSecureSettings(secureSettings1).build();
try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) {
final GoogleCloudStorageService storageService = plugin.storageService;
plugin.storageService.set(plugin.createStorageService(randomBoolean(), null));
plugin.reload(settings);
final GoogleCloudStorageService storageService = plugin.storageService.get();

final MeteredStorage repo1Client = storageService.client("gcs1", "repo1", new GcsRepositoryStatsCollector());
final MeteredStorage repo2Client = storageService.client("gcs1", "repo2", new GcsRepositoryStatsCollector());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,12 @@ public ProxyS3RepositoryPlugin(Settings settings) {
}

@Override
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
S3Service s3Service(
Environment environment,
Settings nodeSettings,
ResourceWatcherService resourceWatcherService,
S3PerProjectClientManager s3PerProjectClientManager
) {
return new ProxyS3Service(environment, nodeSettings, resourceWatcherService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class S3BlobStore implements BlobStore {
S3RepositoriesMetrics s3RepositoriesMetrics,
BackoffPolicy retryThrottledDeleteBackoffPolicy
) {
// TODO: add a projectId field, maybe null for cluster level blobstore
this.service = service;
this.bigArrays = bigArrays;
this.bucket = bucket;
Expand Down Expand Up @@ -310,6 +311,7 @@ public String toString() {
}

public AmazonS3Reference clientReference() {
// TODO: change to service.client(projectId, repositoryMetadata)
return service.client(repositoryMetadata);
}

Expand Down Expand Up @@ -490,6 +492,7 @@ private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobS

@Override
public void close() throws IOException {
// TODO: change to use service.onBlobStoreClose(projectId)
service.onBlobStoreClose();
}

Expand Down
Loading