Skip to content

Commit d06a890

Browse files
committed
Merge branch 'master' into feature/seq_no
* master: (22 commits) Add proper toString() method to UpdateTask (#21582) Fix `InternalEngine#isThrottled` to not always return `false`. (#21592) add `ignore_missing` option to SplitProcessor (#20982) fix trace_match behavior for when there is only one grok pattern (#21413) Remove dead code from GetResponse.java Fixes date range query using epoch with timezone (#21542) Do not cache term queries. (#21566) Updated dynamic mapper section Docs: Clarify date_histogram bucket sizes for DST time zones Handle release of 5.0.1 Fix skip reason for stats API parameters test Reduce skip version for stats API parameter tests Strict level parsing for indices stats Remove cluster update task when task times out (#21578) [DOCS] Mention "all-fields" mode doesn't search across nested documents InternalTestCluster: when restarting a node we should validate the cluster is formed via the node we just restarted Fixed bad asciidoc in boolean mapping docs Fixed bad asciidoc ID in node stats Be strict when parsing values searching for booleans (#21555) Fix time zone rounding edge case for DST overlaps ...
2 parents b2b7595 + aa73a76 commit d06a890

File tree

55 files changed

+1139
-222
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1139
-222
lines changed

core/src/main/java/org/elasticsearch/Version.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch;
2121

22-
import org.apache.lucene.util.MathUtil;
2322
import org.elasticsearch.cluster.metadata.IndexMetaData;
2423
import org.elasticsearch.common.Strings;
2524
import org.elasticsearch.common.SuppressForbidden;
@@ -92,6 +91,8 @@ public class Version {
9291
public static final Version V_5_0_0_rc1 = new Version(V_5_0_0_rc1_ID, org.apache.lucene.util.Version.LUCENE_6_2_0);
9392
public static final int V_5_0_0_ID = 5000099;
9493
public static final Version V_5_0_0 = new Version(V_5_0_0_ID, org.apache.lucene.util.Version.LUCENE_6_2_0);
94+
public static final int V_5_0_1_ID = 5000199;
95+
public static final Version V_5_0_1 = new Version(V_5_0_1_ID, org.apache.lucene.util.Version.LUCENE_6_2_1);
9596
public static final int V_6_0_0_alpha1_ID = 6000001;
9697
public static final Version V_6_0_0_alpha1 = new Version(V_6_0_0_alpha1_ID, org.apache.lucene.util.Version.LUCENE_6_3_0);
9798
public static final Version CURRENT = V_6_0_0_alpha1;
@@ -118,6 +119,8 @@ public static Version fromId(int id) {
118119
switch (id) {
119120
case V_6_0_0_alpha1_ID:
120121
return V_6_0_0_alpha1;
122+
case V_5_0_1_ID:
123+
return V_5_0_1;
121124
case V_5_0_0_ID:
122125
return V_5_0_0;
123126
case V_5_0_0_rc1_ID:

core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,14 @@ public void writeTo(StreamOutput out) throws IOException {
152152

153153
@Override
154154
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
155-
String level = params.param("level", "indices");
156-
boolean isLevelValid = "indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level) || "cluster".equalsIgnoreCase(level);
155+
final String level = params.param("level", "indices");
156+
final boolean isLevelValid =
157+
"cluster".equalsIgnoreCase(level) || "indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level);
157158
if (!isLevelValid) {
158-
return builder;
159+
throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]");
159160
}
160161

162+
161163
builder.startObject("_all");
162164

163165
builder.startObject("primaries");

core/src/main/java/org/elasticsearch/action/get/GetResponse.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
164164
return getResult.toXContent(builder, params);
165165
}
166166

167-
public static GetResponse readGetResponse(StreamInput in) throws IOException {
168-
GetResponse result = new GetResponse();
169-
result.readFrom(in);
170-
return result;
171-
}
172-
173167
@Override
174168
public void readFrom(StreamInput in) throws IOException {
175169
super.readFrom(in);

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.HashMap;
7171
import java.util.IdentityHashMap;
7272
import java.util.Iterator;
73+
import java.util.LinkedHashSet;
7374
import java.util.List;
7475
import java.util.Locale;
7576
import java.util.Map;
@@ -114,7 +115,7 @@ public class ClusterService extends AbstractLifecycleComponent {
114115
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
115116
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
116117
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
117-
private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
118+
final Map<ClusterStateTaskExecutor, LinkedHashSet<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
118119
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
119120
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
120121
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners,
@@ -450,11 +451,12 @@ public <T> void submitStateUpdateTasks(final String source,
450451
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
451452
final IdentityHashMap<T, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks);
452453
final List<UpdateTask<T>> updateTasks = tasksIdentity.entrySet().stream().map(
453-
entry -> new UpdateTask<>(source, entry.getKey(), config, executor, safe(entry.getValue(), logger))
454+
entry -> new UpdateTask<>(source, entry.getKey(), config.priority(), executor, safe(entry.getValue(), logger))
454455
).collect(Collectors.toList());
455456

456457
synchronized (updateTasksPerExecutor) {
457-
List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
458+
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor,
459+
k -> new LinkedHashSet<>(updateTasks.size()));
458460
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
459461
if (tasksIdentity.containsKey(existing.task)) {
460462
throw new IllegalStateException("task [" + executor.describeTasks(Collections.singletonList(existing.task)) +
@@ -466,12 +468,29 @@ public <T> void submitStateUpdateTasks(final String source,
466468

467469
final UpdateTask<T> firstTask = updateTasks.get(0);
468470

469-
if (config.timeout() != null) {
470-
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
471+
final TimeValue timeout = config.timeout();
472+
if (timeout != null) {
473+
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> threadPool.generic().execute(() -> {
474+
final ArrayList<UpdateTask<T>> toRemove = new ArrayList<>();
471475
for (UpdateTask<T> task : updateTasks) {
472476
if (task.processed.getAndSet(true) == false) {
473-
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
474-
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
477+
logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout);
478+
toRemove.add(task);
479+
}
480+
}
481+
if (toRemove.isEmpty() == false) {
482+
ClusterStateTaskExecutor<T> clusterStateTaskExecutor = toRemove.get(0).executor;
483+
synchronized (updateTasksPerExecutor) {
484+
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor);
485+
if (existingTasks != null) {
486+
existingTasks.removeAll(toRemove);
487+
if (existingTasks.isEmpty()) {
488+
updateTasksPerExecutor.remove(clusterStateTaskExecutor);
489+
}
490+
}
491+
}
492+
for (UpdateTask<T> task : toRemove) {
493+
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
475494
}
476495
}
477496
}));
@@ -567,15 +586,15 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
567586
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
568587
final Map<String, ArrayList<T>> processTasksBySource = new HashMap<>();
569588
synchronized (updateTasksPerExecutor) {
570-
List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
589+
LinkedHashSet<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
571590
if (pending != null) {
572591
for (UpdateTask<T> task : pending) {
573592
if (task.processed.getAndSet(true) == false) {
574-
logger.trace("will process {}", task.toString(executor));
593+
logger.trace("will process {}", task);
575594
toExecute.add(task);
576595
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
577596
} else {
578-
logger.trace("skipping {}, already processed", task.toString(executor));
597+
logger.trace("skipping {}, already processed", task);
579598
}
580599
}
581600
}
@@ -633,23 +652,23 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
633652
if (assertsEnabled) {
634653
for (UpdateTask<T> updateTask : toExecute) {
635654
assert batchResult.executionResults.containsKey(updateTask.task) :
636-
"missing task result for " + updateTask.toString(executor);
655+
"missing task result for " + updateTask;
637656
}
638657
}
639658

640659
ClusterState newClusterState = batchResult.resultingState;
641660
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
642661
// fail all tasks that have failed and extract those that are waiting for results
643662
for (UpdateTask<T> updateTask : toExecute) {
644-
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.toString(executor);
663+
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
645664
final ClusterStateTaskExecutor.TaskResult executionResult =
646665
batchResult.executionResults.get(updateTask.task);
647666
executionResult.handle(
648667
() -> proccessedListeners.add(updateTask),
649668
ex -> {
650669
logger.debug(
651670
(Supplier<?>)
652-
() -> new ParameterizedMessage("cluster state update task {} failed", updateTask.toString(executor)), ex);
671+
() -> new ParameterizedMessage("cluster state update task {} failed", updateTask), ex);
653672
updateTask.listener.onFailure(updateTask.source, ex);
654673
}
655674
);
@@ -927,16 +946,13 @@ public TimeValue ackTimeout() {
927946
class UpdateTask<T> extends SourcePrioritizedRunnable {
928947

929948
public final T task;
930-
public final ClusterStateTaskConfig config;
931-
public final ClusterStateTaskExecutor<T> executor;
932949
public final ClusterStateTaskListener listener;
950+
private final ClusterStateTaskExecutor<T> executor;
933951
public final AtomicBoolean processed = new AtomicBoolean();
934952

935-
UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor,
936-
ClusterStateTaskListener listener) {
937-
super(config.priority(), source);
953+
UpdateTask(String source, T task, Priority priority, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
954+
super(priority, source);
938955
this.task = task;
939-
this.config = config;
940956
this.executor = executor;
941957
this.listener = listener;
942958
}
@@ -950,7 +966,8 @@ public void run() {
950966
}
951967
}
952968

953-
public String toString(ClusterStateTaskExecutor<T> executor) {
969+
@Override
970+
public String toString() {
954971
String taskDescription = executor.describeTasks(Collections.singletonList(task));
955972
if (taskDescription.isEmpty()) {
956973
return "[" + source + "]";

core/src/main/java/org/elasticsearch/common/rounding/Rounding.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,38 @@ public byte id() {
128128
@Override
129129
public long round(long utcMillis) {
130130
long rounded = field.roundFloor(utcMillis);
131-
if (timeZone.isFixed() == false && timeZone.getOffset(utcMillis) != timeZone.getOffset(rounded)) {
132-
// in this case, we crossed a time zone transition. In some edge
133-
// cases this will
134-
// result in a value that is not a rounded value itself. We need
135-
// to round again
136-
// to make sure. This will have no affect in cases where
137-
// 'rounded' was already a proper
138-
// rounded value
139-
rounded = field.roundFloor(rounded);
131+
if (timeZone.isFixed() == false) {
132+
// special cases for non-fixed time zones with dst transitions
133+
if (timeZone.getOffset(utcMillis) != timeZone.getOffset(rounded)) {
134+
/*
135+
* the offset change indicates a dst transition. In some
136+
* edge cases this will result in a value that is not a
137+
* rounded value before the transition. We round again to
138+
* make sure we really return a rounded value. This will
139+
* have no effect in cases where we already had a valid
140+
* rounded value
141+
*/
142+
rounded = field.roundFloor(rounded);
143+
} else {
144+
/*
145+
* check if the current time instant is at a start of a DST
146+
* overlap by comparing the offset of the instant and the
147+
* previous millisecond. We want to detect negative offset
148+
* changes that result in an overlap
149+
*/
150+
if (timeZone.getOffset(rounded) < timeZone.getOffset(rounded - 1)) {
151+
/*
152+
* we are rounding a date just after a DST overlap. if
153+
* the overlap is smaller than the time unit we are
154+
* rounding to, we want to add the overlapping part to
155+
* the following rounding interval
156+
*/
157+
long previousRounded = field.roundFloor(rounded - 1);
158+
if (rounded - previousRounded < field.getDurationField().getUnitMillis()) {
159+
rounded = previousRounded;
160+
}
161+
}
162+
}
140163
}
141164
assert rounded == field.roundFloor(rounded);
142165
return rounded;

core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
140140
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
141141
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
142142
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
143+
IndexModule.INDEX_QUERY_CACHE_TERM_QUERIES_SETTING,
143144
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
144145
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
145146
EngineConfig.INDEX_CODEC_SETTING,

core/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public final class IndexModule {
102102
public static final Setting<Boolean> INDEX_QUERY_CACHE_EVERYTHING_SETTING =
103103
Setting.boolSetting("index.queries.cache.everything", false, Property.IndexScope);
104104

105+
// This setting is an escape hatch in case not caching term queries would slow some users down
106+
// Do not document.
107+
public static final Setting<Boolean> INDEX_QUERY_CACHE_TERM_QUERIES_SETTING =
108+
Setting.boolSetting("index.queries.cache.term_queries", false, Property.IndexScope);
109+
105110
private final IndexSettings indexSettings;
106111
private final IndexStoreConfig indexStoreConfig;
107112
private final AnalysisRegistry analysisRegistry;

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,17 +237,13 @@ boolean isThrottled() {
237237
/**
238238
* Returns the number of milliseconds this engine was under index throttling.
239239
*/
240-
public long getIndexThrottleTimeInMillis() {
241-
return 0;
242-
}
240+
public abstract long getIndexThrottleTimeInMillis();
243241

244242
/**
245243
* Returns the <code>true</code> iff this engine is currently under index throttling.
246244
* @see #getIndexThrottleTimeInMillis()
247245
*/
248-
public boolean isThrottled() {
249-
return false;
250-
}
246+
public abstract boolean isThrottled();
251247

252248
/** A Lock implementation that always allows the lock to be acquired */
253249
protected static final class NoOpLock implements Lock {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,12 @@ public void deactivateThrottling() {
13211321
}
13221322
}
13231323

1324+
@Override
1325+
public boolean isThrottled() {
1326+
return throttle.isThrottled();
1327+
}
1328+
1329+
@Override
13241330
public long getIndexThrottleTimeInMillis() {
13251331
return throttle.getThrottleTimeInMillis();
13261332
}

core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,16 @@ public SequenceNumbersService seqNoService() {
262262
throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers");
263263
}
264264

265+
@Override
266+
public boolean isThrottled() {
267+
return false;
268+
}
269+
270+
@Override
271+
public long getIndexThrottleTimeInMillis() {
272+
return 0L;
273+
}
274+
265275
@Override
266276
public Engine recoverFromTranslog() throws IOException {
267277
throw new UnsupportedOperationException("can't recover on a shadow engine");

0 commit comments

Comments
 (0)