Skip to content

ESQL: Verify PushDownEval/Enrich/RegexExtract logical optimizer rules correctness #105043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
costin opened this issue Feb 2, 2024 · 6 comments
Closed
Assignees
Labels
:Analytics/ES|QL AKA ESQL >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)

Comments

@costin
Copy link
Member

costin commented Feb 2, 2024

Description

Double check PushDownEnrich correctness in being pushed down within the plan. Add unit tests both at the logical and physical plan along with more CSV tests for various scenarios: with/without filters (some that can be pushed down, some that do not), projections and evals.

Relates #104957

@costin costin added >enhancement needs:triage Requires assignment of a team area label labels Feb 2, 2024
@costin costin added >non-issue :Analytics/ES|QL AKA ESQL and removed needs:triage Requires assignment of a team area label labels Feb 2, 2024
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Feb 2, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@luigidellaquila
Copy link
Contributor

I'm renaming this issue and eventually labeling it differently, because PushDownEnrich is conceptually very similar to two other rules (PushDownEval and PushDownRegexExtract) and I suspect all the three are affected by the same conceptual problem.
More context in next comments.

@luigidellaquila luigidellaquila changed the title ESQL: Verify PushDownEnrich correctness ESQL: Verify PushDownEval/Enrich/RegexExtract logical optimizer rules correctness Feb 2, 2024
@luigidellaquila
Copy link
Contributor

What these rules do

All these three rules swap OrderBy with Eval/Enrich/Grok/Dissect respectively, eg.

if (child instanceof OrderBy orderBy) {
    return orderBy.replaceChild(eval.replaceChild(orderBy.child()));
} 

These are not properly optimization rules, the reason for their existence is described in PushDownEval javadoc

(expand this for details)

javadoc here

the goal is to group SORT commands together
E.g. from
... | SORT a | EVAL x = b + 1 | SORT x
to
... | EVAL x = b + 1 | SORT a | SORT x

At this point, PushDownAndCombineOrderBy will merge the two SORT commands in a single one, that will eventually become a TopN.

In the example above, the first SORT has no semantic value: even if the data are sorted by a, then they will be re-sorted by x afterwards. In practice, the first SORT could be safely removed even without swapping.

The problem

The problem happens when Eval/Dissect/... overwrite existing fields, that are then used in SORT
Eg. a plan fragment like

... | SORT emp_no | EVAL emp_no = somethingElse | ...

will be converted to

... | EVAL emp_no = somethingElse | SORT emp_no |  ...

Two problems here:

  • Conceptually, it changes the query semantics: the sort could be performed on different values.
  • Practically, it leads to problems with the layout.

Actual failures

Unfortunately, reproducing the problem above is possible also with simple and reasonable queries:

EVAL

from employees 
| keep emp_no, first_name 
| sort emp_no asc 
| eval emp_no = -emp_no 
| mv_expand first_name
{
    "error": {
        "root_cause": [
            {
                "type": "null_pointer_exception",
                "reason": "Cannot invoke \"org.elasticsearch.xpack.esql.planner.Layout$ChannelSet.type()\" because the return value of \"java.util.List.get(int)\" is null"
            }
        ],
        "type": "null_pointer_exception",
        "reason": "Cannot invoke \"org.elasticsearch.xpack.esql.planner.Layout$ChannelSet.type()\" because the return value of \"java.util.List.get(int)\" is null"
    },
    "status": 500
}
[2024-02-02T15:03:41,754][TRACE][o.e.x.e.o.LogicalPlanOptimizer] [runTask-0] Rule optimizer.LogicalPlanOptimizer$PushDownEval applied
Limit[500[INTEGER]]                                               = Limit[500[INTEGER]]
\_MvExpand[emp_no{r}#36,emp_no{r}#41]                             = \_MvExpand[emp_no{r}#36,emp_no{r}#41]
  \_Limit[500[INTEGER]]                                           =   \_Limit[500[INTEGER]]
    \_Eval[[NEG(emp_no{f}#39) AS emp_no]]                         !     \_OrderBy[[Order[emp_no{f}#39,ASC,LAST]]]
      \_OrderBy[[Order[emp_no{f}#39,ASC,LAST]]]                   !       \_EsqlProject[[first_name{f}#40, emp_no{r}#36]]
        \_EsqlProject[[emp_no{f}#39, first_name{f}#40]]           !         \_Eval[[NEG(emp_no{f}#39) AS emp_no]]
          \_EsRelation[employees][emp_no{f}#39, first_name{f}#40] =           \_EsRelation[employees][emp_no{f}#39, first_name{f}#40]          

This fragment of the optimizer execution shows the problem:

  • in the original plan, OrderBy uses emp_no{f}#39, that was extracted by the EsRelation.
  • In the transformed plan, OrderBy still tries to use emp_no{f}#39, but it was overwritten with emp_no{r}#36 by Eval, so it's no longer in the layout, thus the NullPointerException
Why is MV_EXPAND there?

Just to delay LIMIT pushdown a bit, and prevent that OrderBy is converted into TopN before PushDownEval is even executed.
So one of the two:

  • the query with MV_EXPAND fails because of the swap of OrderBy and Eval, performed by PushDownEval
  • if we remove MV_EXPAND, the query succeeds because PushDownEval is never executed

As a proof that the problem is in PushDownEval, disabling that rule the query succeeds also with MV_EXPAND.
So the problem is not MV_EXPAND.

DISSECT/GROK

from employees 
| keep emp_no, first_name 
| sort first_name asc 
| dissect first_name "%{first_name}" 
| mv_expand emp_no
{
    "error": {
        "root_cause": [
            {
                "type": "null_pointer_exception",
                "reason": "Cannot invoke \"org.elasticsearch.xpack.esql.planner.Layout$ChannelSet.type()\" because the return value of \"java.util.List.get(int)\" is null"
            }
        ],
        "type": "null_pointer_exception",
        "reason": "Cannot invoke \"org.elasticsearch.xpack.esql.planner.Layout$ChannelSet.type()\" because the return value of \"java.util.List.get(int)\" is null"
    },
    "status": 500
}
[2024-02-02T15:15:22,602][TRACE][o.e.x.e.o.LogicalPlanOptimizer] [runTask-0] Rule optimizer.LogicalPlanOptimizer$PushDownRegexExtract applied
Limit[500[INTEGER]]                                                                                                           = Limit[500[INTEGER]]
\_MvExpand[emp_no{f}#125,emp_no{r}#127]                                                                                       = \_MvExpand[emp_no{f}#125,emp_no{r}#127]
  \_Limit[500[INTEGER]]                                                                                                       =   \_Limit[500[INTEGER]]
    \_Dissect[first_name{f}#126,Parser[pattern=%{first_name}, appendSeparator=, parser=org.elasticsearch.dissect.DissectParse !     \_OrderBy[[Order[first_name{f}#126,ASC,LAST]]]
r@13d9b617],[first_name{r}#121]]                                                                                              !       \_EsqlProject[[emp_no{f}#125, first_name{r}#121]]
      \_OrderBy[[Order[first_name{f}#126,ASC,LAST]]]                                                                          !         \_Dissect[first_name{f}#126,Parser[pattern=%{first_name}, appendSeparator=, parser=org.elasticsearch.dissect.DissectParse
        \_EsqlProject[[emp_no{f}#125, first_name{f}#126]]                                                                     ! r@13d9b617],[first_name{r}#121]]
          \_EsRelation[employees][emp_no{f}#125, first_name{f}#126]                                                           =           \_EsRelation[employees][emp_no{f}#125, first_name{f}#126]

Exact same problem described for Eval (see above)

ENRICH

Reproducing the problem with ENRICH seems to be more complicated with our standard test datasets.

The logic of the rule is exactly the same though, so similar scenarios seem definitely possible.


In conclusion, the immediate strategy to fix the problem could be to check that Eval/Enrich/Dissect... do not overwrite SORT fields before we push down.

Something like

if (child instanceof OrderBy orderBy && doNotOverlap(orderBy.order(), eval.fields())) {
    return orderBy.replaceChild(eval.replaceChild(orderBy.child()));
} 

This will still result in a query failure due to unbounded sort in some cases.

Another strategy could be, instead of pushdown, try to analyze the plan and see if some SORTs are redundant and can be safely removed.

@luigidellaquila
Copy link
Contributor

performance considerations

These rules also have a significant impact on the efficiency of the execution plan.

At a very high level, ENRICH, GROK, DISSECT and sometimes EVAL are more expensive than SORT (per single record); SORT is typically bounded, so executing it before ENRICH/GROK... will result in a much smaller number of records to enrich.

In addition, SORT can be pushed down to Lucene; these rules, interposing other operations between FROM and SORT, prevent the pushdown.

Practically, these problems can be easily reproduced with the following query:

from employees  | sort emp_no | eval x = to_string(languages) | enrich languages_policy on x  | mv_expand language_name | limit 1

With pushdown rules enabled (that is the state of the art)

the physical plan is the following:

LimitExec[1[INTEGER]]
\_MvExpandExec[job_positions{f}#28,job_positions{r}#46]
  \_TopNExec[[Order[emp_no{f}#19,ASC,LAST]],1[INTEGER],null]
    \_ExchangeExec[[],false]
      \_FragmentExec[filter=null, estimatedRowSize=0, fragment=[<>
Project[[avg_worked_seconds{f}#17, birth_date{f}#18, emp_no{f}#19, first_name{f}#20, gender{f}#21, height{f}#22, heigh
t.float{f}#23, height.half_float{f}#24, height.scaled_float{f}#25, hire_date{f}#26, is_rehired{f}#27, job_positions{f}#28, languages{f}#29, languages.byte{f}#30, languages.long{f}#31, languages.short{f}#32, last_name{f}#33, salary{f}#34, salary_change{f}#35, salary_change.int{f}#36, salary_change.keyword{f}#37, salary_change.long{f}#38, still_hired{f}#39, x{r}#13, language_name{r}#42]]
\_TopN[[Order[emp_no{f}#19,ASC,LAST]],1[INTEGER]]
  \_Enrich[ANY,[6c 61 6e 67 75 61 67 65 73 5f 70 6f 6c 69 63 79][KEYWORD],x{r}#13,{"match":{"indices":[],"match_field":"lang
uage_code","enrich_fields":["language_name"]}},{=.enrich-languages_policy-1707124185423},[language_name{r}#42]]
    \_Eval[[TOSTRING(languages{f}#29) AS x]]
      \_EsRelation[employees][avg_worked_seconds{f}#17, birth_date{f}#18, emp_no{..]<>]]

and the local physical plan is the following

ProjectExec[[avg_worked_seconds{f}#17, birth_date{f}#18, emp_no{f}#19, first_name{f}#20, gender{f}#21, height{f}#22, heigh
t.float{f}#23, height.half_float{f}#24, height.scaled_float{f}#25, hire_date{f}#26, is_rehired{f}#27, job_positions{f}#28, languages{f}#29, languages.byte{f}#30, languages.long{f}#31, languages.short{f}#32, last_name{f}#33, salary{f}#34, salary_change{f}#35, salary_change.int{f}#36, salary_change.keyword{f}#37, salary_change.long{f}#38, still_hired{f}#39, x{r}#13, language_name{r}#42]]
\_FieldExtractExec[avg_worked_seconds{f}#17, birth_date{f}#18, first_n..][]
  \_TopNExec[[Order[emp_no{f}#19,ASC,LAST]],1[INTEGER],null]
    \_FieldExtractExec[emp_no{f}#19][]
      \_EnrichExec[ANY,x{r}#13,languages_policy,language_code,{=.enrich-languages_policy-1707124185423},[language_name{r}#42]]
        \_EvalExec[[TOSTRING(languages{f}#29) AS x]]
          \_FieldExtractExec[languages{f}#29][]
            \_EsQueryExec[employees], query[][_doc{f}#47], limit[], sort[] estimatedRowSize[null]

With pushdown rules disabled

the physical plan is the following:

LimitExec[1[INTEGER]]
\_MvExpandExec[job_positions{f}#58,job_positions{r}#76]
  \_EnrichExec[ANY,x{r}#43,languages_policy,language_code,{=.enrich-languages_policy-1707123581278},[language_name{r}#72]]
    \_EvalExec[[TOSTRING(languages{f}#59) AS x]]
      \_TopNExec[[Order[emp_no{f}#49,ASC,LAST]],1[INTEGER],null]
        \_ExchangeExec[[],false]
          \_FragmentExec[filter=null, estimatedRowSize=0, fragment=[<>
Project[[avg_worked_seconds{f}#47, birth_date{f}#48, emp_no{f}#49, first_name{f}#50, gender{f}#51, height{f}#52, heigh
t.float{f}#53, height.half_float{f}#54, height.scaled_float{f}#55, hire_date{f}#56, is_rehired{f}#57, job_positions{f}#58, languages{f}#59, languages.byte{f}#60, languages.long{f}#61, languages.short{f}#62, last_name{f}#63, salary{f}#64, salary_change{f}#65, salary_change.int{f}#66, salary_change.keyword{f}#67, salary_change.long{f}#68, still_hired{f}#69]]
\_TopN[[Order[emp_no{f}#49,ASC,LAST]],1[INTEGER]]
  \_EsRelation[employees][avg_worked_seconds{f}#47, birth_date{f}#48, emp_no{..]<>]]

and the local physical plan is the following

ProjectExec[[avg_worked_seconds{f}#47, birth_date{f}#48, emp_no{f}#49, first_name{f}#50, gender{f}#51, height{f}#52, heigh
    t.float{f}#53, height.half_float{f}#54, height.scaled_float{f}#55, hire_date{f}#56, is_rehired{f}#57, job_positions{f}#58, languages{f}#59, languages.byte{f}#60, languages.long{f}#61, languages.short{f}#62, last_name{f}#63, salary{f}#64, salary_change{f}#65, salary_change.int{f}#66, salary_change.keyword{f}#67, salary_change.long{f}#68, still_hired{f}#69]]
\_FieldExtractExec[avg_worked_seconds{f}#47, birth_date{f}#48, emp_no{..][]
  \_EsQueryExec[employees], query[][_doc{f}#77], limit[1], sort[[FieldSort[field=emp_no{f}#49, direction=ASC, nulls=LAST]]] estimatedRowSize[null]

The difference is substantial:

  • with the rules enabled:
    • the EVAL and the ENRICH are executed on all the dataset
    • the SORT is executed in memory, at the end, as a TopN (together with the LIMIT 1)
  • with the rules disabled:
    • the SORT and the LIMIT are pushed down to Lucene
    • the EVAL and the ENRICH are executed on one record, on the coordinator node

In a production scenario with a large dataset, this query would fail with timeout.

@wchaparro
Copy link
Member

@luigidellaquila any updates on this one?

@luigidellaquila
Copy link
Contributor

This logic had significant refactoring in last 12 months, so this issue is no longer completely relevant.
I checked the failures listed above, now all these queries work fine, and the random query generators are not showing any additional problems.
I'm closing this issue for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)
Projects
None yet
Development

No branches or pull requests

4 participants