Skip to content

Ensure config reload on ..data symlink switch for CSI driver support #127628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127628.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127628
summary: Ensure config reload on ..data symlink switch for CSI driver support
area: Infra/Settings
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.elasticsearch.health.HealthStatus.YELLOW;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.NodeRoles.dataNode;
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
import static org.elasticsearch.test.NodeRoles.masterNode;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -139,6 +138,11 @@ private void assertMasterNode(Client client, String node) {

public static void writeJSONFile(String node, String json, Logger logger, Long version) throws Exception {
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
writeJSONFile(node, json, logger, version, fileSettingsService.watchedFile());
}

public static void writeJSONFile(String node, String json, Logger logger, Long version, Path targetPath) throws Exception {
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);

Files.createDirectories(fileSettingsService.watchedFileDir());
Path tempFilePath = createTempFile();
Expand All @@ -152,8 +156,8 @@ public static void writeJSONFile(String node, String json, Logger logger, Long v
do {
try {
// this can fail on Windows because of timing
Files.move(tempFilePath, fileSettingsService.watchedFile(), StandardCopyOption.ATOMIC_MOVE);
logger.info("--> after writing JSON config to node {} with path {}", node, tempFilePath);
Files.move(tempFilePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
logger.info("--> after writing JSON config to node {} with path {}", node, targetPath);
return;
} catch (IOException e) {
logger.info("--> retrying writing a settings file [{}]", retryCount);
Expand Down Expand Up @@ -503,6 +507,35 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
}

public void testSymlinkUpdateTriggerReload() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails on linux without the changes to AbstractFileWatchingService.

internalCluster().setBootstrapMasterNodeIndex(0);
final String masterNode = internalCluster().startMasterOnlyNode();
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
Path baseDir = masterFileSettingsService.watchedFileDir();
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));

{
var savedClusterState = setupClusterStateListener(masterNode);
// Create the settings.json as a symlink to simulate k8 setup
// settings.json -> ..data/settings.json
// ..data -> ..TIMESTAMP_TEMP_FOLDER_1
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1"));
writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json"));
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
}
{
var savedClusterState = setupClusterStateListener(masterNode);
// Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2"));
writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
Files.deleteIfExists(baseDir.resolve("..data"));
Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
}
}

public void testHealthIndicatorWithSingleNode() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start the node");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -237,14 +237,16 @@ protected final void watcherThread() {
key.reset();

if (key == settingsDirWatchKey) {
// there may be multiple events for the same file - we only want to re-read once
Set<Path> processedFiles = new HashSet<>();
for (WatchEvent<?> e : events) {
Path fullFile = settingsDir.resolve(e.context().toString());
if (processedFiles.add(fullFile)) {
if (fileChanged(fullFile)) {
process(fullFile);
}
Set<Path> changedPaths = events.stream()
.map(event -> settingsDir.resolve(event.context().toString()))
.collect(Collectors.toSet());
for (var changedPath : changedPaths) {
// If a symlinked dir changed in the settings dir, it could be linked to other symlinks, so reprocess all files
if (filesIsDirectory(changedPath) && filesIsSymbolicLink(changedPath)) {
reprocessAllChangedFilesInSettingsDir();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to reprocess the entire settings dir, instead of just what the symbolic link points at?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we only want to reprocess what's in the ..data directory symlink (or other directory symlinks are present in the settings dir) we need to iterate all files in the directory ..data points to and then resolve the file names against the settings directory, to find all the symlinks that changed (and also filter any files that are not present).

This works under the assumption that the target file and symlink has the same name, which I think is acceptable in this case since it's a directory symlink. In practice (at least in serverless) all files in the settings directory (config/operator) are managed through the ..data symlink, so when it's switched we always need to reprocess all of them. For that reason adding additional logic (resolving the file names against the settings dir) would add complexity for little benefit. For the general case (ECK?) it might make sense to only try to resolve the files in ..data against the settings directory though. WDYT?

break;
} else if (fileChanged(changedPath)) {
process(changedPath);
}
}
} else if (key == configDirWatchKey) {
Expand All @@ -257,14 +259,7 @@ protected final void watcherThread() {
settingsDirWatchKey = enableDirectoryWatcher(settingsDirWatchKey, settingsDir);

// re-read the settings directory, and ping for any changes
try (Stream<Path> files = filesList(settingsDir)) {
for (var f = files.iterator(); f.hasNext();) {
Path file = f.next();
if (fileChanged(file)) {
process(file);
}
}
}
reprocessAllChangedFilesInSettingsDir();
} else if (settingsDirWatchKey != null) {
settingsDirWatchKey.cancel();
}
Expand All @@ -279,6 +274,17 @@ protected final void watcherThread() {
}
}

private void reprocessAllChangedFilesInSettingsDir() throws IOException, InterruptedException {
try (Stream<Path> files = filesList(settingsDir)) {
for (var f = files.iterator(); f.hasNext();) {
Path file = f.next();
if (fileChanged(file)) {
process(file);
}
}
}
}

protected final synchronized void stopWatcher() {
if (watching()) {
logger.debug("stopping watcher ...");
Expand Down Expand Up @@ -378,6 +384,8 @@ private record FileUpdateState(long timestamp, String path, Object fileKey) {}

protected abstract boolean filesIsDirectory(Path path);

protected abstract boolean filesIsSymbolicLink(Path path);

protected abstract <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException;

protected abstract Stream<Path> filesList(Path dir) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ protected boolean filesIsDirectory(Path path) {
return Files.isDirectory(path);
}

@Override
protected boolean filesIsSymbolicLink(Path path) {
return Files.isSymbolicLink(path);
}

@Override
protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
return Files.readAttributes(path, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ protected boolean filesIsDirectory(Path path) {
return Files.isDirectory(path);
}

@Override
protected boolean filesIsSymbolicLink(Path path) {
return Files.isSymbolicLink(path);
}

@Override
protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
return Files.readAttributes(path, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ protected boolean filesIsDirectory(Path path) {
return Files.isDirectory(path);
}

@Override
protected boolean filesIsSymbolicLink(Path path) {
return Files.isSymbolicLink(path);
}

@Override
protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
return Files.readAttributes(path, clazz);
Expand Down
Loading