Skip to content

[Transform] Check alias during update #124825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124825.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124825
summary: Check alias during update
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -25,12 +27,16 @@
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.xpack.core.transform.utils.TransformConfigVersionUtils;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
Expand Down Expand Up @@ -253,4 +259,65 @@ public void testStartReplacesDeprecatedTransformSettings() throws Exception {
assertMaxPageSearchSizeInSettings(transformId, expectedMaxPageSearchSize);
}

public void testMigratedTransformIndex() {
// create transform
var sourceIndex = "source-index";
createSourceIndex(sourceIndex);
var transformId = "transform-migrated-system-index";

var sourceConfig = new SourceConfig(sourceIndex);
var destConfig = new DestConfig("some-dest-index", null, null);
var config = new TransformConfig(
transformId,
sourceConfig,
destConfig,
null,
null,
null,
PivotConfigTests.randomPivotConfig(),
null,
null,
null,
null,
null,
null,
null
);
var putTransform = new PutTransformAction.Request(config, true, TimeValue.THIRTY_SECONDS);
assertTrue(client().execute(PutTransformAction.INSTANCE, putTransform).actionGet().isAcknowledged());

// simulate migration by reindexing and aliasing
var newSystemIndex = TransformInternalIndexConstants.LATEST_INDEX_NAME + "-reindexed";
var reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(TransformInternalIndexConstants.LATEST_INDEX_NAME);
reindexRequest.setDestIndex(newSystemIndex);
reindexRequest.setRefresh(true);
client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet();

var aliasesRequest = admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS);
aliasesRequest.removeIndex(TransformInternalIndexConstants.LATEST_INDEX_NAME);
aliasesRequest.addAlias(newSystemIndex, TransformInternalIndexConstants.LATEST_INDEX_NAME);
aliasesRequest.execute().actionGet();

// update should succeed
var updateConfig = new TransformConfigUpdate(
sourceConfig,
new DestConfig("some-new-dest-index", null, null),
null,
null,
null,
null,
null,
null
);
var updateRequest = new UpdateTransformAction.Request(updateConfig, transformId, true, TimeValue.THIRTY_SECONDS);
client().execute(UpdateTransformAction.INSTANCE, updateRequest).actionGet();

// verify update succeeded
var getTransformRequest = new GetTransformAction.Request(transformId);
var getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
var transformConfig = getTransformResponse.getTransformConfigurations().get(0);
assertThat(transformConfig.getDestination().getIndex(), equalTo("some-new-dest-index"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {

@Before
public void createComponents() {
clusterService = mock(ClusterService.class);
clusterService = mock();
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
transformConfigManager = new IndexBasedTransformConfigManager(
clusterService,
TestIndexNameExpressionResolver.newInstance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private static void updateTransformStateAndGetLastCheckpoint(
long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();

// if: the state is stored on the latest index, it does not need an update
if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
if (transformConfigManager.isLatestTransformIndex(currentState.v2().getIndex())) {
listener.onResponse(lastCheckpoint);
return;
}
Expand Down Expand Up @@ -283,8 +283,7 @@ private static void updateTransformCheckpoint(
ActionListener<Boolean> listener
) {
transformConfigManager.getTransformCheckpointForUpdate(transformId, lastCheckpoint, ActionListener.wrap(checkpointAndVersion -> {
if (checkpointAndVersion == null
|| checkpointAndVersion.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
if (checkpointAndVersion == null || transformConfigManager.isLatestTransformIndex(checkpointAndVersion.v2().getIndex())) {
listener.onResponse(true);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void updateTransformConfiguration(
listener.onFailure(conflictStatusException("Cannot update Transform while the Transform feature is upgrading."));
return;
}
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
if (isLatestTransformIndex(seqNoPrimaryTermAndIndex.getIndex())) {
// update the config in the same, current index using optimistic concurrency control
putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermAndIndex, listener);
} else {
Expand All @@ -180,6 +180,21 @@ public void updateTransformConfiguration(
}
}

@Override
public boolean isLatestTransformIndex(String indexName) {
if (TransformInternalIndexConstants.LATEST_INDEX_NAME.equals(indexName)) {
return true;
}

// in some cases, the System Index gets reindexed and LATEST_INDEX_NAME is now an alias pointing to that reindexed index
// this mostly likely happens after the SystemIndexMigrator ran
// we need to check if the LATEST_INDEX_NAME is now an alias and points to the indexName
var metadata = clusterService.state().projectState().metadata();
var indicesForAlias = metadata.aliasedIndices(TransformInternalIndexConstants.LATEST_INDEX_NAME);
var index = metadata.index(indexName);
return index != null && indicesForAlias.contains(index.getIndex());
}

@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
if (isUpgrading()) {
Expand Down Expand Up @@ -697,7 +712,7 @@ public void putOrUpdateTransformStoredDoc(
// could have been called, see gh#80073
indexRequest.opType(DocWriteRequest.OpType.INDEX);
// if on the latest index use optimistic concurrency control in addition
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
if (isLatestTransformIndex(seqNoPrimaryTermAndIndex.getIndex())) {
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -206,4 +207,8 @@ void getTransformStoredDoc(
void getTransformStoredDocs(Collection<String> transformIds, TimeValue timeout, ActionListener<List<TransformStoredDoc>> listener);

void refresh(ActionListener<Boolean> listener);

default boolean isLatestTransformIndex(String indexName) {
return TransformInternalIndexConstants.LATEST_INDEX_NAME.equals(indexName);
}
}
Loading