Skip to content

Add min_* conditions to rollover #83345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 59 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
e6fe054
min conditions
probakowski Jan 20, 2022
12879c4
min conditions
probakowski Jan 31, 2022
2dfe23a
tests
probakowski Jan 31, 2022
a46dc91
Merge remote-tracking branch 'origin/master' into min-age
probakowski Jan 31, 2022
431f1a9
fix
probakowski Feb 1, 2022
d6a7569
remove unused method
probakowski Feb 1, 2022
4446623
tests
probakowski Feb 1, 2022
16b459f
import
probakowski Feb 1, 2022
c3438d1
spotless
probakowski Feb 1, 2022
84a89cd
Merge remote-tracking branch 'origin/master' into min-age
probakowski Feb 1, 2022
31c286b
fix test
probakowski Feb 1, 2022
3568399
fix imports
probakowski Feb 1, 2022
ac7a903
spotless
probakowski Feb 1, 2022
2aae5c8
spotless
probakowski Feb 1, 2022
6502e7d
fix compilation
probakowski Feb 1, 2022
3cf3dc9
fix tests
probakowski Feb 1, 2022
f74a590
spotless
probakowski Feb 1, 2022
b2821fc
Merge remote-tracking branch 'origin/master' into min-age
probakowski Feb 1, 2022
2d797c5
Merge remote-tracking branch 'origin/master' into min-age
probakowski Feb 24, 2022
721ab16
fix tests
probakowski Feb 24, 2022
6170581
fix tests
probakowski Feb 24, 2022
e036e58
spotless
probakowski Feb 24, 2022
39cf0bf
spotless
probakowski Feb 24, 2022
f7f377a
fix rolloveraction tests
probakowski Feb 24, 2022
a4ccd7a
Update docs/changelog/83345.yaml
probakowski Feb 24, 2022
c6ca948
Merge branch 'master' into min-age
joegallo Jun 29, 2022
aff95df
Bump min_* BWC version to 8.4.0
joegallo Jun 29, 2022
577e98d
Change visibility
joegallo Jun 29, 2022
b588c21
Add/rewrite these javadocs
joegallo Jun 29, 2022
7945472
Merge branch 'master' into min-age
joegallo Jul 22, 2022
9f54660
Typo, fix a comment
joegallo Jul 21, 2022
a50b1c7
Add min_size and min_primary_shard_docs
joegallo Jul 22, 2022
b6cc438
Actually, these tests were wrong as written
joegallo Jul 21, 2022
3ba144c
Test that we won't rollover with only min_* conditions
joegallo Jul 21, 2022
6d034f6
Add yaml rest tests
joegallo Jul 21, 2022
591de5c
Typo, fix variable case
joegallo Jul 22, 2022
dabb4d8
Fix a minor buglet
joegallo Jul 22, 2022
929077e
Pull out a randomByteSize test utility
joegallo Jul 22, 2022
1a3ee5f
Add min_* to ilm usage
joegallo Jul 22, 2022
7ee742d
Add some missing asserts in RolloverActionTests
joegallo Jul 22, 2022
bd91ba4
Fix these switch betweens
joegallo Jul 22, 2022
22b03f4
Oops, fix deserialization else branch
joegallo Jul 22, 2022
ebda57e
Nit: follow builder pattern
joegallo Jul 22, 2022
6e0d386
Add min_size, min_primary_shard_docs to NamedXContents
joegallo Jul 22, 2022
b4248ea
Don't leak a mutable map
joegallo Jul 22, 2022
573b384
Rewrite areConditionsMet slightly
joegallo Jul 22, 2022
f5bdfbe
Use areConditionsMet in the task executor, too
joegallo Jul 22, 2022
9c2c03d
Merge branch 'master' into min-age
joegallo Jul 22, 2022
e346d57
Whitespace
joegallo Jul 22, 2022
6658822
Add a validation rule to _rollover requests
joegallo Jul 22, 2022
7927ec9
Merge branch 'main' into min-age
joegallo Jul 25, 2022
3c990a8
Add/fix includedInVersion for min_* conditions
joegallo Jul 25, 2022
76249a4
Tidier RolloverAction Writeable implementation
joegallo Jul 25, 2022
becd6af
Nicer fromXContent error message for conditions
joegallo Jul 25, 2022
8a60331
Add a condition Type (MIN/MAX) and drop isRequired
joegallo Jul 25, 2022
4d1f3c1
Add javadoc for areConditionsMet
joegallo Jul 25, 2022
912120a
Simpler serialization for min_age and min_size
joegallo Jul 25, 2022
99769ed
Merge branch 'main' into min-age
joegallo Jul 25, 2022
bcfd250
Spotless
joegallo Jul 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
min conditions
  • Loading branch information
probakowski committed Jan 31, 2022
commit 12879c40d27313fc30db5958ccaead8a197a53cd
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.action.ValidateActions.addValidationError;

/**
* Request class to swap index under an alias or increment data stream generation upon satisfying conditions
*
* <p>
* Note: there is a new class with the same name for the Java HLRC that uses a typeless format.
* Any changes done to this class should also go to that client class.
*/
Expand Down Expand Up @@ -88,8 +90,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
if (includeTypeName) {
// expecting one type only
for (Map.Entry<String, Object> mappingsEntry : parser.map().entrySet()) {
@SuppressWarnings("unchecked")
final Map<String, Object> value = (Map<String, Object>) mappingsEntry.getValue();
@SuppressWarnings("unchecked") final Map<String, Object> value = (Map<String, Object>) mappingsEntry.getValue();
request.createIndexRequest.mapping(value);
}
} else {
Expand Down Expand Up @@ -143,7 +144,8 @@ public RolloverRequest(StreamInput in) throws IOException {
createIndexRequest = new CreateIndexRequest(in);
}

RolloverRequest() {}
RolloverRequest() {
}

public RolloverRequest(String rolloverTarget, String newIndexName) {
this.rolloverTarget = rolloverTarget;
Expand Down Expand Up @@ -174,7 +176,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String[] indices() {
return new String[] { rolloverTarget };
return new String[]{rolloverTarget};
}

@Override
Expand Down Expand Up @@ -263,33 +265,33 @@ public void addMaxPrimaryShardSizeCondition(ByteSizeValue size) {
* Adds required condition to check if the index is at least <code>age</code> old
*/
public void addMinIndexAgeCondition(TimeValue age) {
MaxAgeCondition maxAgeCondition = new MaxAgeCondition(age);
if (this.conditions.containsKey(maxAgeCondition.name)) {
throw new IllegalArgumentException(maxAgeCondition.name + " condition is already set");
MinAgeCondition minAgeCondition = new MinAgeCondition(age);
if (this.conditions.containsKey(minAgeCondition.name)) {
throw new IllegalArgumentException(minAgeCondition.name + " condition is already set");
}
this.conditions.put(maxAgeCondition.name, maxAgeCondition);
this.conditions.put(minAgeCondition.name, minAgeCondition);
}

/**
* Adds required condition to check if the index has at least <code>numDocs</code>
*/
public void addMinIndexDocsCondition(long numDocs) {
MaxDocsCondition maxDocsCondition = new MaxDocsCondition(numDocs);
if (this.conditions.containsKey(maxDocsCondition.name)) {
throw new IllegalArgumentException(maxDocsCondition.name + " condition is already set");
MinDocsCondition minDocsCondition = new MinDocsCondition(numDocs);
if (this.conditions.containsKey(minDocsCondition.name)) {
throw new IllegalArgumentException(minDocsCondition.name + " condition is already set");
}
this.conditions.put(maxDocsCondition.name, maxDocsCondition);
this.conditions.put(minDocsCondition.name, minDocsCondition);
}

/**
* Adds a size-based required condition to check if the size of the largest primary shard is at least <code>size</code>.
*/
public void addMinPrimaryShardSizeCondition(ByteSizeValue size) {
MaxPrimaryShardSizeCondition maxPrimaryShardSizeCondition = new MaxPrimaryShardSizeCondition(size);
if (this.conditions.containsKey(maxPrimaryShardSizeCondition.name)) {
throw new IllegalArgumentException(maxPrimaryShardSizeCondition + " condition is already set");
MinPrimaryShardSizeCondition minPrimaryShardSizeCondition = new MinPrimaryShardSizeCondition(size);
if (this.conditions.containsKey(minPrimaryShardSizeCondition.name)) {
throw new IllegalArgumentException(minPrimaryShardSizeCondition + " condition is already set");
}
this.conditions.put(maxPrimaryShardSizeCondition.name, maxPrimaryShardSizeCondition);
this.conditions.put(minPrimaryShardSizeCondition.name, minPrimaryShardSizeCondition);
}

public boolean isDryRun() {
Expand All @@ -308,6 +310,19 @@ public String getNewIndexName() {
return newIndexName;
}

public boolean areConditionsMet(Map<String, Boolean> trialConditionResults) {
Collection<Condition<?>> conditions = getConditions().values();
boolean allRequiredMet = conditions.stream()
.filter(Condition::isRequired)
.allMatch(c -> trialConditionResults.get(c.toString()));

boolean anyNonRequiredMet = conditions.stream()
.filter(Predicate.not(Condition::isRequired))
.anyMatch(c -> trialConditionResults.get(c.toString()));

return trialConditionResults.size() == 0 || (allRequiredMet && anyNonRequiredMet);
}

/**
* Returns the inner {@link CreateIndexRequest}. Allows to configure mappings, settings and aliases for the new index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,24 @@ protected void masterOperation(
buildStats(metadata.index(trialSourceIndexName), statsResponse)
);

// If this is a dry run, return with the results without invoking a cluster state update
if (rolloverRequest.isDryRun()) {
listener.onResponse(
new RolloverResponse(trialSourceIndexName, trialRolloverIndexName, trialConditionResults, true, false, false, false)
);
return;
}

boolean allRequiredMet = rolloverRequest.getConditions()
.values()
.stream()
.filter(Condition::isRequired)
.allMatch(c -> trialConditionResults.get(c.toString()));

boolean anyNonRequiredMet = rolloverRequest.getConditions()
.values()
.stream()
.filter(Predicate.not(Condition::isRequired))
.anyMatch(c -> trialConditionResults.get(c.toString()));

final RolloverResponse trialRolloverResponse = new RolloverResponse(
trialSourceIndexName,
trialRolloverIndexName,
trialConditionResults,
false,
rolloverRequest.isDryRun(),
false,
false,
false
);

// If this is a dry run, return with the results without invoking a cluster state update
if (rolloverRequest.isDryRun()) {
listener.onResponse(trialRolloverResponse);
return;
}

// Pre-check the conditions to see whether we should submit a new cluster state task
if (trialConditionResults.size() == 0 || (allRequiredMet && anyNonRequiredMet)) {
if (rolloverRequest.areConditionsMet(trialConditionResults)) {
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
Expand All @@ -197,6 +183,22 @@ protected void masterOperation(
);
}

private boolean areConditionsMet(RolloverRequest rolloverRequest, Map<String, Boolean> trialConditionResults) {
boolean allRequiredMet = rolloverRequest.getConditions()
.values()
.stream()
.filter(Condition::isRequired)
.allMatch(c -> trialConditionResults.get(c.toString()));

boolean anyNonRequiredMet = rolloverRequest.getConditions()
.values()
.stream()
.filter(Predicate.not(Condition::isRequired))
.anyMatch(c -> trialConditionResults.get(c.toString()));

return trialConditionResults.size() == 0 || (allRequiredMet && anyNonRequiredMet);
}

static Map<String, Boolean> evaluateConditions(final Collection<Condition<?>> conditions, @Nullable final Condition.Stats stats) {
Objects.requireNonNull(conditions, "conditions must not be null");

Expand Down Expand Up @@ -337,7 +339,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover";

activeShardsObserver.waitForActiveShards(
new String[] { rolloverIndex.get() },
new String[]{rolloverIndex.get()},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public RolloverAction(StreamInput in) throws IOException {
}
maxAge = in.readOptionalTimeValue();
maxDocs = in.readOptionalVLong();
if (in.getVersion().after(Version.V_8_1_0)) {
if (in.getVersion().onOrAfter(Version.V_8_1_0)) {
if (in.readBoolean()) {
minPrimaryShardSize = new ByteSizeValue(in);
} else {
Expand Down Expand Up @@ -166,7 +166,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeOptionalTimeValue(maxAge);
out.writeOptionalVLong(maxDocs);
if (out.getVersion().after(Version.V_8_1_0)) {
if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeBoolean(minPrimaryShardSize != null);
if (minPrimaryShardSize != null) {
minPrimaryShardSize.writeTo(out);
Expand Down Expand Up @@ -257,7 +257,10 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
maxSize,
maxPrimaryShardSize,
maxAge,
maxDocs
maxDocs,
minPrimaryShardSize,
minAge,
minDocs
);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
Expand All @@ -277,7 +280,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)

@Override
public int hashCode() {
return Objects.hash(maxSize, maxPrimaryShardSize, maxAge, maxDocs);
return Objects.hash(maxSize, maxPrimaryShardSize, maxAge, maxDocs, minPrimaryShardSize, minAge, minDocs);
}

@Override
Expand All @@ -292,7 +295,10 @@ public boolean equals(Object obj) {
return Objects.equals(maxSize, other.maxSize)
&& Objects.equals(maxPrimaryShardSize, other.maxPrimaryShardSize)
&& Objects.equals(maxAge, other.maxAge)
&& Objects.equals(maxDocs, other.maxDocs);
&& Objects.equals(maxDocs, other.maxDocs)
&& Objects.equals(minPrimaryShardSize, other.minPrimaryShardSize)
&& Objects.equals(minAge, other.minAge)
&& Objects.equals(minDocs, other.minDocs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
.rolloverIndex(
rolloverRequest,
ActionListener.wrap(
response -> listener.onResponse(response.getConditionStatus().values().stream().anyMatch(i -> i), EmptyInfo.INSTANCE),
response -> listener.onResponse(rolloverRequest.areConditionsMet(response.getConditionStatus()), EmptyInfo.INSTANCE),
listener::onFailure
)
);
Expand All @@ -237,9 +237,21 @@ Long getMaxDocs() {
return maxDocs;
}

ByteSizeValue getMinPrimaryShardSize() {
return minPrimaryShardSize;
}

TimeValue getMinAge() {
return minAge;
}

Long getMinDocs() {
return minDocs;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), maxSize, maxPrimaryShardSize, maxAge, maxDocs);
return Objects.hash(super.hashCode(), maxSize, maxPrimaryShardSize, maxAge, maxDocs, minPrimaryShardSize, minAge, minDocs);
}

@Override
Expand All @@ -255,7 +267,10 @@ public boolean equals(Object obj) {
&& Objects.equals(maxSize, other.maxSize)
&& Objects.equals(maxPrimaryShardSize, other.maxPrimaryShardSize)
&& Objects.equals(maxAge, other.maxAge)
&& Objects.equals(maxDocs, other.maxDocs);
&& Objects.equals(maxDocs, other.maxDocs)
&& Objects.equals(minPrimaryShardSize, other.minPrimaryShardSize)
&& Objects.equals(minAge, other.minAge)
&& Objects.equals(minDocs, other.minDocs);
}

// We currently have no information to provide for this AsyncWaitStep, so this is an empty object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testRefreshPhaseJson() throws IOException {
String indexName = meta.getIndex().getName();

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 1L));
actions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
actions.put("set_priority", new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testIndexCanBeSafelyUpdated() {
IndexMetadata meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()).build();

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 1L));
actions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
actions.put("set_priority", new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
Expand Down Expand Up @@ -389,7 +389,7 @@ public void testIndexCanBeSafelyUpdated() {
IndexMetadata meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()).build();

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, TimeValue.timeValueSeconds(5), null));
actions.put("rollover", new RolloverAction(null, null, TimeValue.timeValueSeconds(5), null, null, null, null));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
Expand Down Expand Up @@ -420,7 +420,7 @@ public void testIndexCanBeSafelyUpdated() {
IndexMetadata meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()).build();

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 1L));
actions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
actions.put("set_priority", new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
Expand All @@ -441,7 +441,7 @@ public void testIndexCanBeSafelyUpdated() {
IndexMetadata meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap()).build();

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 1L));
actions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
actions.put("set_priority", new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
Expand Down Expand Up @@ -480,14 +480,14 @@ public void testUpdateIndicesForPolicy() throws IOException {
assertTrue(eligibleToCheckForRefresh(meta));

Map<String, LifecycleAction> oldActions = new HashMap<>();
oldActions.put("rollover", new RolloverAction(null, null, null, 1L));
oldActions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
oldActions.put("set_priority", new SetPriorityAction(100));
Phase oldHotPhase = new Phase("hot", TimeValue.ZERO, oldActions);
Map<String, Phase> oldPhases = Collections.singletonMap("hot", oldHotPhase);
LifecyclePolicy oldPolicy = new LifecyclePolicy("my-policy", oldPhases);

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 1L));
actions.put("rollover", new RolloverAction(null, null, null, 1L, null, null, null));
actions.put("set_priority", new SetPriorityAction(100));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
Expand All @@ -507,7 +507,7 @@ public void testUpdateIndicesForPolicy() throws IOException {
assertThat(updatedState, equalTo(existingState));

actions = new HashMap<>();
actions.put("rollover", new RolloverAction(null, null, null, 2L));
actions.put("rollover", new RolloverAction(null, null, null, 2L, null, null, null));
actions.put("set_priority", new SetPriorityAction(150));
hotPhase = new Phase("hot", TimeValue.ZERO, actions);
phases = Collections.singletonMap("hot", hotPhase);
Expand Down
Loading