Skip to content

ESQL: Report original_types #124913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124913.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124913
summary: Report `original_types`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00);
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,14 @@ public String nodeString() {
}

protected abstract String label();

/**
* If this field is unsupported this contains the underlying ES types. If there
* is a type conflict this will have many elements, some or all of which may
* be actually supported types.
*/
@Nullable
public List<String> originalTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attribute != Field
This information makes sense only in the case of unsupported fields (potentially union types in the future) and thus makes sense on FieldAttribute & co (such as UnsupportedAttribute) alone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean.

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
*/
package org.elasticsearch.xpack.esql.core.type;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
Expand All @@ -23,32 +27,39 @@
*/
public class UnsupportedEsField extends EsField {

private final String originalType;
private final List<String> originalTypes;
private final String inherited; // for fields belonging to parents (or grandparents) that have an unsupported type

public UnsupportedEsField(String name, String originalType) {
this(name, originalType, null, new TreeMap<>());
public UnsupportedEsField(String name, List<String> originalTypes) {
this(name, originalTypes, null, new TreeMap<>());
}

public UnsupportedEsField(String name, String originalType, String inherited, Map<String, EsField> properties) {
public UnsupportedEsField(String name, List<String> originalTypes, String inherited, Map<String, EsField> properties) {
super(name, DataType.UNSUPPORTED, properties, false);
this.originalType = originalType;
this.originalTypes = originalTypes;
this.inherited = inherited;
}

public UnsupportedEsField(StreamInput in) throws IOException {
this(
readCachedStringWithVersionCheck(in),
readCachedStringWithVersionCheck(in),
in.readOptionalString(),
in.readImmutableMap(EsField::readFrom)
);
this(readCachedStringWithVersionCheck(in), readOriginalTypes(in), in.readOptionalString(), in.readImmutableMap(EsField::readFrom));
}

private static List<String> readOriginalTypes(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
return in.readCollectionAsList(i -> ((PlanStreamInput) i).readCachedString());
} else {
return List.of(readCachedStringWithVersionCheck(in).split(","));
}
}

@Override
public void writeContent(StreamOutput out) throws IOException {
writeCachedStringWithVersionCheck(out, getName());
writeCachedStringWithVersionCheck(out, getOriginalType());
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
out.writeCollection(getOriginalTypes(), (o, s) -> ((PlanStreamOutput) o).writeCachedString(s));
} else {
writeCachedStringWithVersionCheck(out, String.join(",", getOriginalTypes()));
}
out.writeOptionalString(getInherited());
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
}
Expand All @@ -57,8 +68,8 @@ public String getWriteableName() {
return "UnsupportedEsField";
}

public String getOriginalType() {
return originalType;
public List<String> getOriginalTypes() {
return originalTypes;
}

public String getInherited() {
Expand All @@ -81,11 +92,11 @@ public boolean equals(Object o) {
return false;
}
UnsupportedEsField that = (UnsupportedEsField) o;
return Objects.equals(originalType, that.originalType) && Objects.equals(inherited, that.inherited);
return Objects.equals(originalTypes, that.originalTypes) && Objects.equals(inherited, that.inherited);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), originalType, inherited);
return Objects.hash(super.hashCode(), originalTypes, inherited);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,13 @@ public void testAliasToInt() throws IOException {
}

public void testFlattenedUnsupported() throws IOException {
assumeOriginalTypesReported();
new Test("flattened").createIndex("test", "flattened");
index("test", """
{"flattened": {"a": "foo"}}""");
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");

assertResultMap(result, List.of(columnInfo("flattened", "unsupported")), List.of(matchesList().item(null)));
assertResultMap(result, List.of(unsupportedColumnInfo("flattened", "flattened")), List.of(matchesList().item(null)));
}

public void testEmptyMapping() throws IOException {
Expand Down Expand Up @@ -689,6 +690,7 @@ public void testByteFieldWithIntSubfieldTooBig() throws IOException {
* </pre>.
*/
public void testIncompatibleTypes() throws IOException {
assumeOriginalTypesReported();
keywordTest().createIndex("test1", "f");
index("test1", """
{"f": "f1"}""");
Expand All @@ -697,7 +699,11 @@ public void testIncompatibleTypes() throws IOException {
{"f": 1}""");

Map<String, Object> result = runEsql("FROM test*");
assertResultMap(result, List.of(columnInfo("f", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("f", "keyword", "long")),
List.of(matchesList().item(null), matchesList().item(null))
);
ResponseException e = expectThrows(ResponseException.class, () -> runEsql("FROM test* | SORT f | LIMIT 3"));
String err = EntityUtils.toString(e.getResponse().getEntity());
assertThat(
Expand Down Expand Up @@ -758,10 +764,7 @@ public void testDistinctInEachIndex() throws IOException {
* </pre>.
*/
public void testMergeKeywordAndObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
keywordTest().createIndex("test1", "file");
index("test1", """
{"file": "f1"}""");
Expand Down Expand Up @@ -797,7 +800,7 @@ public void testMergeKeywordAndObject() throws IOException {
Map<String, Object> result = runEsql("FROM test* | SORT file.raw | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("file", "unsupported"), columnInfo("file.raw", "keyword")),
List.of(unsupportedColumnInfo("file", "keyword", "object"), columnInfo("file.raw", "keyword")),
List.of(matchesList().item(null).item("o2"), matchesList().item(null).item(null))
);
}
Expand All @@ -817,6 +820,7 @@ public void testMergeKeywordAndObject() throws IOException {
* </pre>.
*/
public void testPropagateUnsupportedToSubFields() throws IOException {
assumeOriginalTypesReported();
createIndex("test", index -> {
index.startObject("properties");
index.startObject("f");
Expand All @@ -842,7 +846,7 @@ public void testPropagateUnsupportedToSubFields() throws IOException {
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")),
List.of(unsupportedColumnInfo("f", "ip_range"), unsupportedColumnInfo("f.raw", "ip_range")),
List.of(matchesList().item(null).item(null))
);
}
Expand All @@ -867,10 +871,7 @@ public void testPropagateUnsupportedToSubFields() throws IOException {
* </pre>.
*/
public void testMergeUnsupportedAndObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
createIndex("test1", index -> {
index.startObject("properties");
index.startObject("f").field("type", "ip_range").endObject();
Expand Down Expand Up @@ -905,7 +906,7 @@ public void testMergeUnsupportedAndObject() throws IOException {
Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(
result,
List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")),
List.of(unsupportedColumnInfo("f", "ip_range"), unsupportedColumnInfo("f.raw", "ip_range")),
List.of(matchesList().item(null).item(null), matchesList().item(null).item(null))
);
}
Expand Down Expand Up @@ -958,10 +959,7 @@ public void testIntegerDocValuesConflict() throws IOException {
* In an ideal world we'd promote the {@code integer} to an {@code long} and just go.
*/
public void testLongIntegerConflict() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
longTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
index("test1", """
{"emp_no": 1}""");
Expand All @@ -980,7 +978,11 @@ public void testLongIntegerConflict() throws IOException {
);

Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(result, List.of(columnInfo("emp_no", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("emp_no", "integer", "long")),
List.of(matchesList().item(null), matchesList().item(null))
);
}

/**
Expand All @@ -1000,10 +1002,7 @@ public void testLongIntegerConflict() throws IOException {
* In an ideal world we'd promote the {@code short} to an {@code integer} and just go.
*/
public void testIntegerShortConflict() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
intTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
index("test1", """
{"emp_no": 1}""");
Expand All @@ -1022,7 +1021,11 @@ public void testIntegerShortConflict() throws IOException {
);

Map<String, Object> result = runEsql("FROM test* | LIMIT 2");
assertResultMap(result, List.of(columnInfo("emp_no", "unsupported")), List.of(matchesList().item(null), matchesList().item(null)));
assertResultMap(
result,
List.of(unsupportedColumnInfo("emp_no", "integer", "short")),
List.of(matchesList().item(null), matchesList().item(null))
);
}

/**
Expand All @@ -1048,10 +1051,7 @@ public void testIntegerShortConflict() throws IOException {
* </pre>.
*/
public void testTypeConflictInObject() throws IOException {
assumeTrue(
"order of fields in error message inconsistent before 8.14",
getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
);
assumeOriginalTypesReported();
createIndex("test1", empNoInObject("integer"));
index("test1", """
{"foo": {"emp_no": 1}}""");
Expand All @@ -1060,7 +1060,10 @@ public void testTypeConflictInObject() throws IOException {
{"foo": {"emp_no": "cat"}}""");

Map<String, Object> result = runEsql("FROM test* | LIMIT 3");
assertMap(result, getResultMatcher(result).entry("columns", List.of(columnInfo("foo.emp_no", "unsupported"))).extraOk());
assertMap(
result,
getResultMatcher(result).entry("columns", List.of(unsupportedColumnInfo("foo.emp_no", "integer", "keyword"))).extraOk()
);

ResponseException e = expectThrows(ResponseException.class, () -> runEsql("FROM test* | SORT foo.emp_no | LIMIT 3"));
String err = EntityUtils.toString(e.getResponse().getEntity());
Expand Down Expand Up @@ -1370,6 +1373,12 @@ private void assumeIndexResolverNestedFieldsNameClashFixed() throws IOException
);
}

private void assumeOriginalTypesReported() throws IOException {
var capsName = EsqlCapabilities.Cap.REPORT_ORIGINAL_TYPES.name().toLowerCase(Locale.ROOT);
boolean requiredClusterCapability = clusterHasCapability("POST", "/_query", List.of(), List.of(capsName)).orElse(false);
assumeTrue("This test makes sense for versions that report original types", requiredClusterCapability);
}

private CheckedConsumer<XContentBuilder, IOException> empNoInObject(String empNoType) {
return index -> {
index.startObject("properties");
Expand Down Expand Up @@ -1705,6 +1714,10 @@ private static Map<String, Object> columnInfo(String name, String type) {
return Map.of("name", name, "type", type);
}

private static Map<String, Object> unsupportedColumnInfo(String name, String... originalTypes) {
return Map.of("name", name, "type", "unsupported", "original_types", List.of(originalTypes));
}

private static void index(String name, String... docs) throws IOException {
Request request = new Request("POST", "/" + name + "/_bulk");
request.addParameter("refresh", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -110,7 +111,7 @@ private static void walkMapping(String name, Object value, Map<String, EsField>
field = DateEsField.dateEsField(name, properties, docValues);
} else if (esDataType == UNSUPPORTED) {
String type = content.get("type").toString();
field = new UnsupportedEsField(name, type, null, properties);
field = new UnsupportedEsField(name, List.of(type), null, properties);
propagateUnsupportedType(name, type, properties);
} else {
field = new EsField(name, esDataType, properties, docValues);
Expand Down Expand Up @@ -165,9 +166,9 @@ public static void propagateUnsupportedType(String inherited, String originalTyp
UnsupportedEsField u;
if (field instanceof UnsupportedEsField) {
u = (UnsupportedEsField) field;
u = new UnsupportedEsField(u.getName(), originalType, inherited, u.getProperties());
u = new UnsupportedEsField(u.getName(), List.of(originalType), inherited, u.getProperties());
} else {
u = new UnsupportedEsField(field.getName(), originalType, inherited, field.getProperties());
u = new UnsupportedEsField(field.getName(), List.of(originalType), inherited, field.getProperties());
}
entry.setValue(u);
propagateUnsupportedType(inherited, originalType, u.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testBasicAsyncExecution() throws Exception {
try (var finalResponse = future.get()) {
assertThat(finalResponse, notNullValue());
assertThat(finalResponse.isRunning(), is(false));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(finalResponse).size(), equalTo(1));
}

Expand All @@ -103,7 +103,7 @@ public void testBasicAsyncExecution() throws Exception {
try (var finalResponse = again.get()) {
assertThat(finalResponse, notNullValue());
assertThat(finalResponse.isRunning(), is(false));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(finalResponse).size(), equalTo(1));
}

Expand Down Expand Up @@ -231,7 +231,7 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {

try (var response = request.execute().actionGet(60, TimeUnit.SECONDS)) {
assertThat(response.isRunning(), is(false));
assertThat(response.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(response.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(response).size(), equalTo(1));

if (keepOnCompletion) {
Expand All @@ -244,7 +244,7 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
try (var resp = future.actionGet(60, TimeUnit.SECONDS)) {
assertThat(resp.asyncExecutionId().get(), equalTo(id));
assertThat(resp.isRunning(), is(false));
assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long", null))));
assertThat(getValuesList(resp).size(), equalTo(1));
}
} else {
Expand Down
Loading
Loading