Skip to content

ESQL: incorrect planning of remote enrich #118531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
bpintea opened this issue Dec 12, 2024 · 6 comments
Open

ESQL: incorrect planning of remote enrich #118531

bpintea opened this issue Dec 12, 2024 · 6 comments
Labels
:Analytics/ES|QL AKA ESQL >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.17.7 v9.1.0

Comments

@bpintea
Copy link
Contributor

bpintea commented Dec 12, 2024

Description

A remote enrich query using a policy that exports/enriches with a field already present in the query is planned slightly incorrectly -- it works, but fails verification.

Example: policy hosts matches on ip and has as enrich_fields ip and os:

FROM *:events,events
| EVAL ip= TO_STR(host)
| SORT timestamp, user, ip
| LIMIT 5
| ENRICH  _REMOTE:hosts ON ip
| KEEP host, timestamp, user, os

This produces the optimised physical plan:

ProjectExec[[host{f}#14, timestamp{f}#16, user{f}#15, os{r}#21]]
\_TopNExec[[Order[timestamp{f}#16,ASC,LAST], Order[user{f}#15,ASC,LAST], Order[ip{r}#3,ASC,LAST]],5[INTEGER],null]
  \_ExchangeExec[[host{f}#14, timestamp{f}#16, user{f}#15, os{r}#21, ip{r}#3],false]
    \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
        Project[[host{f}#14, timestamp{f}#16, user{f}#15, os{r}#21, ip{r}#3]]
        \_Enrich[REMOTE,[68 6f 73 74 73][KEYWORD],ip{r}#3,{"match":{"indices":[],"match_field":"ip","enrich_fields":["ip","os"]}},{=.enrich-hosts-1733836249291, c1=.enrich-hosts-1733836248939, c2=.enrich-hosts-1733836249107},[ip{r}#20, os{r}#21]]
          \_TopN[[Order[timestamp{f}#16,ASC,LAST], Order[user{f}#15,ASC,LAST], Order[ip{r}#3,ASC,LAST]],5[INTEGER]]
            \_Eval[[TOSTRING(host{f}#14) AS ip]]
              \_EsRelation[events,c1:events,c2:events][host{f}#14, timestamp{f}#16, user{f}#15]<>]]

Note that in the fragment, Project outputs ip{r}#3 (the node is produced by ProjectAwayColumns based on the TopN below Enrich), but Enrich below it outputs ip{r}#20 (since it also enriches with its own ip field). So the verification fails later when remapping the fragment plan on ProjectExec, since its inputs don't provide an ip{r}#3. (If we KEEP ip too, the verification would also fail due to attributes with duplicate name.)

Normally we would drop the ip after TopN, but the Enrich remote planning pushes it to the remote cluster and the ip is still needed for the coordinator TopN.

Related: #118307.

@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Dec 12, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@alex-spies
Copy link
Contributor

Excellent find @bpintea !

This can be reproduced without CCQ by forcing the ENRICH to be executed on the datanodes via the _REMOTE mode.

I played around and it seems that otherwise, our planner really tries to execute ENRICH on the coordinator. That explains why we only see this in CCQ scenarios.

Local reproducer:

$ curl -u elastic:password -H "Content-Type: application/json" "127.0.0.1:9200/languages_lookup" -XPUT -d '{
  "mappings": {
        "properties": {"language_code": {"type": "keyword"}, "language_name":{"type": "keyword"}}}}'

$ curl -u elastic:password -H "Content-Type: application/json" "127.0.0.1:9200/_enrich/policy/my-policy" -XPUT -d'
{
  "match": {                                                                                   
    "indices": "languages_lookup",
    "match_field": "language_code",
    "enrich_fields": ["language_code", "language_name"]
  }
}
' 

$ curl -u elastic:password -H "Content-Type: application/json" "127.0.0.1:9200/_enrich/policy/my-policy/_execute?wait_for_completion=true" -XPUT

$ curl -u elastic:password -HContent-Type:application/json 'localhost:9200/test/_doc?refresh' -d'{"x": "1"}'

$ curl -u elastic:password -H "Content-Type: application/json" "127.0.0.1:9200/_query?format=txt" -d '
{
  "query": "FROM test | eval language_code = x | sort language_code | enrich _REMOTE:my-policy"   
}
'

{"error":{"root_cause":[{"type":"illegal_state_exception","reason":"Found 3 problems\nline 1:59: Plan [TopNExec[[Order[language_code{r}#135,ASC,LAST]],1000[INTEGER],null]] optimized incorrectly due to duplicate output attribute language_code{r}#135\nline 1:59: Plan [ExchangeExec[[x{f}#138, x.keyword{f}#139, language_code{r}#143, language_name{r}#144, language_code{r}#135],false]] optimized incorrectly due to duplicate output attribute language_code{r}#135\nline -1:-1: Plan [FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>\nProject[[x{f}#138, x.keyword{f}#139, language_code{r}#143, language_name{r}#144, language_code{r}#135]]\n\\_Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#135,{\"match\":{\"indices\":[],\"match_field\":\"language_\ncode\",\"enrich_fields\":[\"language_code\",\"language_name\"]}},{=.enrich-my-policy-1733996215420},[language_code{r}#1\n43, language_name{r}#144]]\n  \\_TopN[[Order[language_code{r}#135,ASC,LAST]],1000[INTEGER]]\n    \\_Eval[[x{f}#138 AS language_code]]\n      \\_EsRelation[test][x{f}#138, x.keyword{f}#139]<>]]] optimized incorrectly due to duplicate output attribute language_code{r}#135"}],"type":"illegal_state_exception","reason":"Found 3 problems\nline 1:59: Plan [TopNExec[[Order[language_code{r}#135,ASC,LAST]],1000[INTEGER],null]] optimized incorrectly due to duplicate output attribute language_code{r}#135\nline 1:59: Plan [ExchangeExec[[x{f}#138, x.keyword{f}#139, language_code{r}#143, language_name{r}#144, language_code{r}#135],false]] optimized incorrectly due to duplicate output attribute language_code{r}#135\nline -1:-1: Plan [FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>\nProject[[x{f}#138, x.keyword{f}#139, language_code{r}#143, language_name{r}#144, language_code{r}#135]]\n\\_Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#135,{\"match\":{\"indices\":[],\"match_field\":\"language_\ncode\",\"enrich_fields\":[\"language_code\",\"language_name\"]}},{=.enrich-my-policy-1733996215420},[language_code{r}#1\n43, language_name{r}#144]]\n  \\_TopN[[Order[language_code{r}#135,ASC,LAST]],1000[INTEGER]]\n    \\_Eval[[x{f}#138 AS language_code]]\n      \\_EsRelation[test][x{f}#138, x.keyword{f}#139]<>]]] optimized incorrectly due to duplicate output attribute language_code{r}#135"},"status":500}%  

@alex-spies
Copy link
Contributor

In this case, ProjectAwayColumns looks even more broken, because it tries to get both language_codes:

[2024-12-12T10:50:36,633][INFO ][o.e.x.e.o.PhysicalPlanOptimizer] [runTask-0] Rule physical.ProjectAwayColumns applied
TopNExec[[Order[language_code{r}#12,ASC,LAST]],1000[INTEGER],null]                                                       = TopNExec[[Order[language_code{r}#12,ASC,LAST]],1000[INTEGER],null]
\_ExchangeExec[[],false]                                                                                                 ! \_ExchangeExec[[x{f}#15, x.keyword{f}#16, language_code{r}#20, language_name{r}#21, language_code{r}#12],false]
  \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>                                               =   \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#12,{"match":{"indices":[],"match_field":"language_c ! Project[[x{f}#15, x.keyword{f}#16, language_code{r}#20, language_name{r}#21, language_code{r}#12]]
ode","enrich_fields":["language_code","language_name"]}},{=.enrich-my-policy-1733997034894},[language_code{r}#20         ! \_Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#12,{"match":{"indices":[],"match_field":"language_c
, language_name{r}#21]]                                                                                                  ! ode","enrich_fields":["language_code","language_name"]}},{=.enrich-my-policy-1733997034894},[language_code{r}#20
\_TopN[[Order[language_code{r}#12,ASC,LAST]],1000[INTEGER]]                                                              ! , language_name{r}#21]]
  \_Eval[[x{f}#15 AS language_code]]                                                                                     !   \_TopN[[Order[language_code{r}#12,ASC,LAST]],1000[INTEGER]]
    \_EsRelation[test][x{f}#15, x.keyword{f}#16]<>]]                                                                     !     \_Eval[[x{f}#15 AS language_code]]
                                                                                                                         !       \_EsRelation[test][x{f}#15, x.keyword{f}#16]<>]]

@alex-spies
Copy link
Contributor

It looks like the bug happens when mapping from logical to physical plan:

[2024-12-12T11:06:51,129][DEBUG][o.e.x.e.s.EsqlSession    ] [runTask-0] Optimized logicalPlan plan:
Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#3,{"match":{"indices":[],"match_field":"language_co
de","enrich_fields":["language_code","language_name"]}},{=.enrich-my-policy-1733998004150},[language_code{r}#12,
 language_name{r}#13]]
\_TopN[[Order[language_code{r}#3,ASC,LAST]],1000[INTEGER]]
  \_Eval[[x{f}#7 AS language_code]]
    \_EsRelation[test][x{f}#7, x.keyword{f}#8]

[2024-12-12T11:06:51,130][DEBUG][o.e.x.e.s.EsqlSession    ] [runTask-0] Physical plan:
TopNExec[[Order[language_code{r}#3,ASC,LAST]],1000[INTEGER],null]
\_ExchangeExec[[],false]
  \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,[6d 79 2d 70 6f 6c 69 63 79][KEYWORD],language_code{r}#3,{"match":{"indices":[],"match_field":"language_co
de","enrich_fields":["language_code","language_name"]}},{=.enrich-my-policy-1733998004150},[language_code{r}#12,
 language_name{r}#13]]
\_TopN[[Order[language_code{r}#3,ASC,LAST]],1000[INTEGER]]
  \_Eval[[x{f}#7 AS language_code]]
    \_EsRelation[test][x{f}#7, x.keyword{f}#8]<>]]

Note the TopNExec that has popped up on top. It's necessary to be there if we want to push the ENRICH to the data nodes.

However, it is actually incorrect on the logical level: the ENRICH shadows the language_code (which is both an enrich field and the match field) , so after executing ENRICH, we have no guarantee that the original language_code attribute is still present. It just happens to be present on a physical level because it is never projected away.

I think this means we can solve this in the following ways:

  • Relax the dependency check on the physical level: allow duplicate attribute names. Then we can change EnrichExec.output to admit that it just attaches the enrich fields and doesn't discard anything, even when shadowed.
  • Do a finicky renaming dance to rename the shadowed language_code before the EnrichExec so we can use it in the TopNExec downstream from it.
  • Don't allow ENRICH to shadow the the match field. Might break bwc due to things like ENRICH policy ON match_field WITH match_field = enrich_field.

@bpintea
Copy link
Contributor Author

bpintea commented Dec 12, 2024

It looks like the bug happens when mapping from logical to physical plan:

Indeed, I also think the REMOTE mapping of the Enrich produces the pattern that leads to the bug.

It just happens to be present on a physical level because it is never projected away.

Well, the sorting should happen on the id that comes from the source (well, from the EVAL on top of it in your example). So in that sense, it's correct. But by the way ENRICH is planned, it shadows this attribute with its own. Ideally we'd plan such that this ENRICH attribute is just dropped -- I think the fix should go in this direction (so maybe along your 2nd point above).

Edit:

we'd plan such that this ENRICH attribute is just dropped

Even if the query asks to KEEP it, we could still load it from the FROM-source "branch" (as it should be the same in both source and enrich policy/index). But maybe still not the right solution, after all. I guess we'd have to do some renaming dancing and let both attributes through the exchanges.

@alex-spies
Copy link
Contributor

I'd prefer we attempt to relax the dependency check to allow duplicate names in the physical plans, and make the physical plan's output methods honest to what is actually put out, physically.

The headache which is shadowing is required to consider in logical plans' output methods to correctly resolve references, and to correctly determine the overall output if no explicit KEEP command is present. We already have to fight shadowing complexities during logical optimizations, which is a source of bugs and a time sink.

If we could stop caring about shadowing in physical plans, that would bring more simplicity into this part of the code base.

bpintea added a commit that referenced this issue Dec 12, 2024
This disables verifying the plans generated for remote ENRICHing.
It also re-enables corresponding failing test.

Related: #118531
Fixes #118307.
bpintea added a commit to bpintea/elasticsearch that referenced this issue Dec 12, 2024
This disables verifying the plans generated for remote ENRICHing.
It also re-enables corresponding failing test.

Related: elastic#118531
Fixes elastic#118307.

(cherry picked from commit e7a4436)
elasticsearchmachine pushed a commit that referenced this issue Dec 12, 2024
)

* ESQL: Enable physical plan verification (#118114)

This enables the physical plan verification. For it, a couple of changes
needed to be applied/corrected: * AggregateMapper creates attributes
with unique names; * AggregateExec's verification needs not consider
ordinal attribute(s); * LookupJoinExec needs to merge attributes of same
name at output, "winning" the right child; * ExchangeExec does no input
referencing, since it only outputs all synthetic attributes, "sourced"
from remote exchanges; * FieldExtractExec doesn't reference the
attributes it "produces".

* ESQL: Disable remote enrich verification (#118534)

This disables verifying the plans generated for remote ENRICHing.
It also re-enables corresponding failing test.

Related: #118531
Fixes #118307.

(cherry picked from commit e7a4436)
maxhniebergall pushed a commit to maxhniebergall/elasticsearch that referenced this issue Dec 16, 2024
…#118534) (elastic#118302)

* ESQL: Enable physical plan verification (elastic#118114)

This enables the physical plan verification. For it, a couple of changes
needed to be applied/corrected: * AggregateMapper creates attributes
with unique names; * AggregateExec's verification needs not consider
ordinal attribute(s); * LookupJoinExec needs to merge attributes of same
name at output, "winning" the right child; * ExchangeExec does no input
referencing, since it only outputs all synthetic attributes, "sourced"
from remote exchanges; * FieldExtractExec doesn't reference the
attributes it "produces".

* ESQL: Disable remote enrich verification (elastic#118534)

This disables verifying the plans generated for remote ENRICHing.
It also re-enables corresponding failing test.

Related: elastic#118531
Fixes elastic#118307.

(cherry picked from commit e7a4436)
maxhniebergall pushed a commit to maxhniebergall/elasticsearch that referenced this issue Dec 16, 2024
…#118534) (elastic#118302)

* ESQL: Enable physical plan verification (elastic#118114)

This enables the physical plan verification. For it, a couple of changes
needed to be applied/corrected: * AggregateMapper creates attributes
with unique names; * AggregateExec's verification needs not consider
ordinal attribute(s); * LookupJoinExec needs to merge attributes of same
name at output, "winning" the right child; * ExchangeExec does no input
referencing, since it only outputs all synthetic attributes, "sourced"
from remote exchanges; * FieldExtractExec doesn't reference the
attributes it "produces".

* ESQL: Disable remote enrich verification (elastic#118534)

This disables verifying the plans generated for remote ENRICHing.
It also re-enables corresponding failing test.

Related: elastic#118531
Fixes elastic#118307.

(cherry picked from commit e7a4436)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.17.7 v9.1.0
Projects
None yet
Development

No branches or pull requests

3 participants