Skip to content

Commit e61a080

Browse files
authored
Merge branch 'main' into markjhoy/add_MV_INTERSECT_function_esql
2 parents 91129af + 3c7ff2e commit e61a080

File tree

133 files changed

+1347
-939
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+1347
-939
lines changed

docs/reference/query-languages/esql/_snippets/lists/time-series-aggregation-functions.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
* [`AVG_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-avg_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
33
* [`COUNT_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-count_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
44
* [`COUNT_DISTINCT_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-count_distinct_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
5-
* [`DELTA`](../../functions-operators/time-series-aggregation-functions.md#esql-rate) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
5+
* [`DELTA`](../../functions-operators/time-series-aggregation-functions.md#esql-delta) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
66
* [`FIRST_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-first_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
7-
* [`IDELTA`](../../functions-operators/time-series-aggregation-functions.md#esql-rate) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
8-
* [`INCREASE`](../../functions-operators/time-series-aggregation-functions.md#esql-rate) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
9-
* [`IRATE`](../../functions-operators/time-series-aggregation-functions.md#esql-rate) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
7+
* [`IDELTA`](../../functions-operators/time-series-aggregation-functions.md#esql-idelta) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
8+
* [`INCREASE`](../../functions-operators/time-series-aggregation-functions.md#esql-increase) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
9+
* [`IRATE`](../../functions-operators/time-series-aggregation-functions.md#esql-irate) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
1010
* [`LAST_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-last_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
1111
* [`MAX_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-max_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`
1212
* [`MIN_OVER_TIME`](../../functions-operators/time-series-aggregation-functions.md#esql-min_over_time) {applies_to}`stack: preview 9.2` {applies_to}`serverless: preview`

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.ingest.geoip;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.TransportVersions;
1413
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1514
import org.elasticsearch.common.io.stream.StreamInput;
1615
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -44,10 +43,6 @@
4443

4544
public class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
4645

47-
private static boolean includeSha256(TransportVersion version) {
48-
return version.onOrAfter(TransportVersions.V_8_15_0);
49-
}
50-
5146
private static final ParseField DATABASES = new ParseField("databases");
5247

5348
static final GeoIpTaskState EMPTY = new GeoIpTaskState(Map.of());
@@ -78,14 +73,7 @@ public static GeoIpTaskState fromXContent(XContentParser parser) throws IOExcept
7873

7974
GeoIpTaskState(StreamInput input) throws IOException {
8075
databases = input.readImmutableMap(
81-
in -> new Metadata(
82-
in.readLong(),
83-
in.readVInt(),
84-
in.readVInt(),
85-
in.readString(),
86-
in.readLong(),
87-
includeSha256(in.getTransportVersion()) ? input.readOptionalString() : null
88-
)
76+
in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString(), in.readLong(), input.readOptionalString())
8977
);
9078
}
9179

@@ -144,9 +132,7 @@ public void writeTo(StreamOutput out) throws IOException {
144132
o.writeVInt(v.lastChunk);
145133
o.writeString(v.md5);
146134
o.writeLong(v.lastCheck);
147-
if (includeSha256(o.getTransportVersion())) {
148-
o.writeOptionalString(v.sha256);
149-
}
135+
o.writeOptionalString(v.sha256);
150136
});
151137
}
152138

modules/percolator/src/test/java/org/elasticsearch/percolator/QueryBuilderStoreTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.index.mapper.TestDocumentParserContext;
3333
import org.elasticsearch.index.query.SearchExecutionContext;
3434
import org.elasticsearch.index.query.TermQueryBuilder;
35+
import org.elasticsearch.script.field.BinaryDocValuesField;
3536
import org.elasticsearch.search.SearchModule;
3637
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
3738
import org.elasticsearch.test.ESTestCase;
@@ -88,7 +89,7 @@ public void testStoringQueryBuilders() throws IOException {
8889
when(searchExecutionContext.getWriteableRegistry()).thenReturn(writableRegistry());
8990
when(searchExecutionContext.getParserConfig()).thenReturn(parserConfig());
9091
when(searchExecutionContext.getForField(fieldMapper.fieldType(), fielddataOperation)).thenReturn(
91-
new BytesBinaryIndexFieldData(fieldMapper.fullPath(), CoreValuesSourceType.KEYWORD)
92+
new BytesBinaryIndexFieldData(fieldMapper.fullPath(), CoreValuesSourceType.KEYWORD, BinaryDocValuesField::new)
9293
);
9394
when(searchExecutionContext.getFieldType(Mockito.anyString())).thenAnswer(invocation -> {
9495
final String fieldName = (String) invocation.getArguments()[0];

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.util.List;
8585
import java.util.Map;
8686
import java.util.concurrent.atomic.AtomicLong;
87+
import java.util.function.Supplier;
8788
import java.util.stream.Collectors;
8889

8990
import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead;
@@ -144,10 +145,11 @@ public void writeBlob(OperationPurpose purpose, String blobName, InputStream inp
144145
throws IOException {
145146
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
146147
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
148+
final var condition = failIfAlreadyExists ? ConditionalOperation.IF_NONE_MATCH : ConditionalOperation.NONE;
147149
if (blobSize <= getLargeBlobThresholdInBytes()) {
148-
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
150+
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
149151
} else {
150-
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
152+
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
151153
}
152154
}
153155

@@ -536,6 +538,59 @@ String buildKey(String blobName) {
536538
return keyPath + blobName;
537539
}
538540

541+
/**
542+
* Enumeration of mutually exlusive conditional operations supported by S3.
543+
*
544+
* @see <a href=https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-requests.html>S3-conditional-requests</a>
545+
*/
546+
sealed interface ConditionalOperation permits ConditionalOperation.IfMatch, ConditionalOperation.IfNoneMatch,
547+
ConditionalOperation.None {
548+
ConditionalOperation NONE = new None();
549+
ConditionalOperation IF_NONE_MATCH = new IfNoneMatch();
550+
551+
static ConditionalOperation ifMatch(String etag) {
552+
return new IfMatch(etag);
553+
}
554+
555+
record None() implements ConditionalOperation {}
556+
557+
record IfNoneMatch() implements ConditionalOperation {}
558+
559+
record IfMatch(String etag) implements ConditionalOperation {}
560+
}
561+
562+
static void putObject(
563+
OperationPurpose purpose,
564+
S3BlobStore s3BlobStore,
565+
String blobName,
566+
long contentLength,
567+
Supplier<RequestBody> body,
568+
ConditionalOperation condition
569+
) {
570+
final var putRequestBuilder = PutObjectRequest.builder()
571+
.bucket(s3BlobStore.bucket())
572+
.key(blobName)
573+
.contentLength(contentLength)
574+
.storageClass(s3BlobStore.getStorageClass())
575+
.acl(s3BlobStore.getCannedACL());
576+
if (s3BlobStore.serverSideEncryption()) {
577+
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
578+
}
579+
if (s3BlobStore.supportsConditionalWrites(purpose)) {
580+
switch (condition) {
581+
case ConditionalOperation.IfMatch ifMatch -> putRequestBuilder.ifMatch(ifMatch.etag);
582+
case ConditionalOperation.IfNoneMatch ignored -> putRequestBuilder.ifNoneMatch("*");
583+
case ConditionalOperation.None ignored -> {
584+
}
585+
}
586+
}
587+
S3BlobStore.configureRequestForMetrics(putRequestBuilder, s3BlobStore, Operation.PUT_OBJECT, purpose);
588+
final var putRequest = putRequestBuilder.build();
589+
try (var client = s3BlobStore.clientReference()) {
590+
client.client().putObject(putRequest, body.get());
591+
}
592+
}
593+
539594
/**
540595
* Uploads a blob using a single upload request
541596
*/
@@ -545,33 +600,17 @@ void executeSingleUpload(
545600
final String blobName,
546601
final InputStream input,
547602
final long blobSize,
548-
final boolean failIfAlreadyExists
603+
final ConditionalOperation condition
549604
) throws IOException {
550-
try (var clientReference = s3BlobStore.clientReference()) {
605+
try {
551606
// Extra safety checks
552607
if (blobSize > MAX_FILE_SIZE.getBytes()) {
553608
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
554609
}
555610
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
556611
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
557612
}
558-
559-
final var putRequestBuilder = PutObjectRequest.builder()
560-
.bucket(s3BlobStore.bucket())
561-
.key(blobName)
562-
.contentLength(blobSize)
563-
.storageClass(s3BlobStore.getStorageClass())
564-
.acl(s3BlobStore.getCannedACL());
565-
if (s3BlobStore.serverSideEncryption()) {
566-
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
567-
}
568-
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
569-
putRequestBuilder.ifNoneMatch("*");
570-
}
571-
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
572-
573-
final var putRequest = putRequestBuilder.build();
574-
clientReference.client().putObject(putRequest, RequestBody.fromInputStream(input, blobSize));
613+
putObject(purpose, s3BlobStore, blobName, blobSize, () -> RequestBody.fromInputStream(input, blobSize), condition);
575614
} catch (final SdkException e) {
576615
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
577616
}
@@ -590,7 +629,7 @@ private void executeMultipart(
590629
final long partSize,
591630
final long blobSize,
592631
final PartOperation partOperation,
593-
final boolean failIfAlreadyExists
632+
final ConditionalOperation condition
594633
) throws IOException {
595634

596635
ensureMultiPartUploadSize(blobSize);
@@ -661,8 +700,13 @@ private void executeMultipart(
661700
.uploadId(uploadId)
662701
.multipartUpload(b -> b.parts(parts));
663702

664-
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
665-
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
703+
if (s3BlobStore.supportsConditionalWrites(purpose)) {
704+
switch (condition) {
705+
case ConditionalOperation.IfMatch ifMatch -> completeMultipartUploadRequestBuilder.ifMatch(ifMatch.etag);
706+
case ConditionalOperation.IfNoneMatch ignored -> completeMultipartUploadRequestBuilder.ifNoneMatch("*");
707+
case ConditionalOperation.None ignored -> {
708+
}
709+
}
666710
}
667711

668712
S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
@@ -690,7 +734,7 @@ void executeMultipartUpload(
690734
final String blobName,
691735
final InputStream input,
692736
final long blobSize,
693-
final boolean failIfAlreadyExists
737+
final ConditionalOperation condition
694738
) throws IOException {
695739
executeMultipart(
696740
purpose,
@@ -708,7 +752,7 @@ void executeMultipartUpload(
708752
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
709753
}
710754
},
711-
failIfAlreadyExists
755+
condition
712756
);
713757
}
714758

@@ -756,7 +800,7 @@ void executeMultipartCopy(
756800
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
757801
}
758802
}),
759-
false
803+
ConditionalOperation.NONE
760804
);
761805
}
762806

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.stream.IntStream;
5353

5454
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
55+
import static org.elasticsearch.repositories.s3.S3BlobContainer.ConditionalOperation;
5556
import static org.hamcrest.Matchers.equalTo;
5657
import static org.hamcrest.Matchers.instanceOf;
5758
import static org.mockito.ArgumentMatchers.any;
@@ -75,7 +76,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
7576
randomAlphaOfLengthBetween(1, 10),
7677
null,
7778
blobSize,
78-
randomBoolean()
79+
randomCondition()
7980
)
8081
);
8182
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
@@ -96,7 +97,7 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
9697
blobName,
9798
new ByteArrayInputStream(new byte[0]),
9899
ByteSizeUnit.MB.toBytes(2),
99-
randomBoolean()
100+
randomCondition()
100101
)
101102
);
102103
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
@@ -132,7 +133,7 @@ public void testExecuteSingleUpload() throws IOException {
132133
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
133134
}
134135

135-
final boolean failIfAlreadyExists = randomBoolean();
136+
final ConditionalOperation conditionalOperation = randomCondition();
136137

137138
final S3Client client = configureMockClient(blobStore);
138139

@@ -142,7 +143,7 @@ public void testExecuteSingleUpload() throws IOException {
142143
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
143144

144145
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
145-
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
146+
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);
146147

147148
final PutObjectRequest request = requestCaptor.getValue();
148149
assertEquals(bucketName, request.bucket());
@@ -158,8 +159,19 @@ public void testExecuteSingleUpload() throws IOException {
158159
);
159160
}
160161

161-
if (failIfAlreadyExists) {
162-
assertEquals("*", request.ifNoneMatch());
162+
switch (conditionalOperation) {
163+
case ConditionalOperation.IfMatch ifMatch -> {
164+
assertEquals(ifMatch.etag(), request.ifMatch());
165+
assertNull(request.ifNoneMatch());
166+
}
167+
case ConditionalOperation.IfNoneMatch ignored -> {
168+
assertNull(request.ifMatch());
169+
assertEquals("*", request.ifNoneMatch());
170+
}
171+
case ConditionalOperation.None ignored -> {
172+
assertNull(request.ifMatch());
173+
assertNull(request.ifNoneMatch());
174+
}
163175
}
164176

165177
final RequestBody requestBody = bodyCaptor.getValue();
@@ -185,7 +197,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
185197
randomAlphaOfLengthBetween(1, 10),
186198
null,
187199
blobSize,
188-
randomBoolean()
200+
randomCondition()
189201
)
190202
);
191203
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
@@ -204,7 +216,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
204216
randomAlphaOfLengthBetween(1, 10),
205217
null,
206218
blobSize,
207-
randomBoolean()
219+
randomCondition()
208220
)
209221
);
210222
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
@@ -255,7 +267,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
255267
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
256268
}
257269

258-
final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
270+
final ConditionalOperation conditionalOperation = doCopy ? ConditionalOperation.NONE : randomCondition();
259271

260272
final S3Client client = configureMockClient(blobStore);
261273

@@ -305,7 +317,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
305317
if (doCopy) {
306318
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
307319
} else {
308-
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
320+
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);
309321
}
310322

311323
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
@@ -372,8 +384,19 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
372384
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
373385
assertEquals(uploadId, compRequest.uploadId());
374386

375-
if (failIfAlreadyExists) {
376-
assertEquals("*", compRequest.ifNoneMatch());
387+
switch (conditionalOperation) {
388+
case ConditionalOperation.IfMatch ifMatch -> {
389+
assertEquals(ifMatch.etag(), compRequest.ifMatch());
390+
assertNull(compRequest.ifNoneMatch());
391+
}
392+
case ConditionalOperation.IfNoneMatch ignored -> {
393+
assertNull(compRequest.ifMatch());
394+
assertEquals("*", compRequest.ifNoneMatch());
395+
}
396+
case ConditionalOperation.None ignored -> {
397+
assertNull(compRequest.ifMatch());
398+
assertNull(compRequest.ifNoneMatch());
399+
}
377400
}
378401

379402
final List<String> actualETags = compRequest.multipartUpload()
@@ -461,7 +484,7 @@ public void close() {}
461484
blobName,
462485
new ByteArrayInputStream(new byte[0]),
463486
blobSize,
464-
randomBoolean()
487+
randomCondition()
465488
);
466489
});
467490

@@ -557,6 +580,14 @@ public void close() {}
557580
return client;
558581
}
559582

583+
private ConditionalOperation randomCondition() {
584+
return switch (between(0, 2)) {
585+
case 0 -> ConditionalOperation.NONE;
586+
case 1 -> ConditionalOperation.IF_NONE_MATCH;
587+
default -> ConditionalOperation.ifMatch(randomAlphanumericOfLength(128));
588+
};
589+
}
590+
560591
private static void closeMockClient(S3BlobStore blobStore) {
561592
final var finalClientReference = blobStore.clientReference();
562593
assertFalse(finalClientReference.decRef());

0 commit comments

Comments
 (0)