Skip to content

Remote ENRICH + MV_EXPAND + LIMIT produces incorrect data #130153

Open
@smalyshev

Description

@smalyshev

Elasticsearch Version

9.1

Installed Plugins

No response

Java Version

bundled

OS Version

Problem Description

There is a semantic problem in the combination of LIMIT and remotely executed operations. The problem is that if LIMIT is semantically applied before the operation that changes cardinality (such as MV_EXPAND) then if it is executed on remote side, the cardinality is not preserved correctly due to how the planner now handles limits. Most plans would keep the operations after LIMIT on the coordinator side but remote ENRICH now allows to force them to the remote side.

Steps to Reproduce

Example query:

FROM employees, remote1:employees | SORT emp_no | LIMIT 2 | EVAL language_code = languages | MV_EXPAND job_positions | ENRICH lang_r | KEEP emp_no, languages, language_name, job_positions

This returns the result:

    emp_no     |   languages   | language_name |     job_positions     
---------------+---------------+---------------+-----------------------
10001          |2              |French         |Accountant             
10001          |2              |French         |Senior Python Developer
10001          |2              |French         |Accountant             
10001          |2              |French         |Senior Python Developer

and the plan is:

2025-06-26T22:17:53,468][DEBUG][o.e.x.e.s.EsqlSession    ] [node-1] Optimized logicalPlan plan:
EsqlProject[[emp_no{f}#87, languages{f}#83, language_name{r}#92, job_positions{r}#93]]
\_Enrich[ANY,lang_r[KEYWORD],language_code{r}#74,{"match":{"indices":[],"match_field":"language_code","enrich_fields":["la
nguage_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#92]]
  \_Limit[10000[INTEGER],true]
    \_MvExpand[job_positions{f}#88,job_positions{r}#93]
      \_Project[[emp_no{f}#87, job_positions{f}#88, languages{f}#83, languages.byte{f}#84, languages.long{f}#86, languages.sho
rt{f}#85, languages{f}#83 AS language_code#74]]
        \_TopN[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER]]
          \_EsRelation[employees,remote1:employees][emp_no{f}#87, job_positions{f}#88, languages{f}#83, ..]
[2025-06-26T22:17:53,469][DEBUG][o.e.x.e.s.EsqlSession    ] [node-1] Physical plan:
ProjectExec[[emp_no{f}#87, languages{f}#83, language_name{r}#92, job_positions{r}#93]]
\_EnrichExec[ANY,match,language_code{r}#74,lang_r,language_code,{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-172791349
2133},[language_name{r}#92]]
  \_LimitExec[10000[INTEGER],null]
    \_MvExpandExec[job_positions{f}#88,job_positions{r}#93]
      \_ProjectExec[[emp_no{f}#87, job_positions{f}#88, languages{f}#83, languages.byte{f}#84, languages.long{f}#86, languages.sho
rt{f}#85, languages{f}#83 AS language_code#74]]
        \_TopNExec[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER],null]
          \_ExchangeExec[[],false]
            \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
TopN[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER]]
\_EsRelation[employees,remote1:employees][emp_no{f}#87, job_positions{f}#88, languages{f}#83, ..]<>]]

As we can see, expansion and limits happen on coordinator side. Changing the query to remote ENRICH:

FROM employees, remote1:employees | SORT emp_no | LIMIT 2 | EVAL language_code = languages | MV_EXPAND job_positions | ENRICH _remote:lang_r | KEEP emp_no, languages, language_name, job_positions

produces:

    emp_no     |   languages   | language_name |     job_positions     
---------------+---------------+---------------+-----------------------
10001          |2              |French         |Senior Python Developer
10001          |2              |French         |Accountant             

With the plan:

2025-06-26T22:16:39,677][DEBUG][o.e.x.e.s.EsqlSession    ] [node-1] Optimized logicalPlan plan:
EsqlProject[[emp_no{f}#41, languages{f}#37, language_name{r}#46, job_positions{r}#47]]
\_Enrich[REMOTE,lang_r[KEYWORD],language_code{r}#28,{"match":{"indices":[],"match_field":"language_code","enrich_fields":[
"language_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#46]]
  \_Limit[10000[INTEGER],true]
    \_MvExpand[job_positions{f}#42,job_positions{r}#47]
      \_Project[[emp_no{f}#41, job_positions{f}#42, languages{f}#37, languages.byte{f}#38, languages.long{f}#40, languages.sho
rt{f}#39, languages{f}#37 AS language_code#28]]
        \_TopN[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER]]
          \_EsRelation[employees,remote1:employees][emp_no{f}#41, job_positions{f}#42, languages{f}#37, ..]
[2025-06-26T22:16:39,678][DEBUG][o.e.x.e.s.EsqlSession    ] [node-1] Physical plan:
ProjectExec[[emp_no{f}#41, languages{f}#37, language_name{r}#46, job_positions{r}#47]]
\_LimitExec[10000[INTEGER],null]
  \_TopNExec[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER],null]
    \_ExchangeExec[[],false]
      \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,lang_r[KEYWORD],language_code{r}#28,{"match":{"indices":[],"match_field":"language_code","enrich_fields":[
"language_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#46]]
\_Limit[10000[INTEGER],true]
  \_MvExpand[job_positions{f}#42,job_positions{r}#47]
    \_Project[[emp_no{f}#41, job_positions{f}#42, languages{f}#37, languages.byte{f}#38, languages.long{f}#40, languages.sho
rt{f}#39, languages{f}#37 AS language_code#28]]
      \_TopN[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER]]
        \_EsRelation[employees,remote1:employees][emp_no{f}#41, job_positions{f}#42, languages{f}#37, ..]<>]]

We can see that the expansion is moved to the remote side and the TopNExec applied to both sides, as it usually is for remote computation, but the semantics is wrong here as it is applying to the result of the MvExpand operation, not it's input.

I think this would be a problem in general with LIMITs and cardinality-changing operations, since if we run computations remotely (either on another cluster or even on another node) we would not be able to properly select necessary amount of rows - if the query says LIMIT n this limit is supposed to be applied to all aggregate data to all clusters, and if there's any cardinality-changing operation after that, then the limit can not be applied after it. This probably means all such computations need to be forced on coordinator and can not be compatible with things like remote ENRICH. Though I am not sure yet how best to recognize all such cases.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions