Description
Elasticsearch Version
9.1
Installed Plugins
No response
Java Version
bundled
OS Version
Problem Description
There is a semantic problem in the combination of LIMIT and remotely executed operations. The problem is that if LIMIT is semantically applied before the operation that changes cardinality (such as MV_EXPAND) then if it is executed on remote side, the cardinality is not preserved correctly due to how the planner now handles limits. Most plans would keep the operations after LIMIT on the coordinator side but remote ENRICH now allows to force them to the remote side.
Steps to Reproduce
Example query:
FROM employees, remote1:employees | SORT emp_no | LIMIT 2 | EVAL language_code = languages | MV_EXPAND job_positions | ENRICH lang_r | KEEP emp_no, languages, language_name, job_positions
This returns the result:
emp_no | languages | language_name | job_positions
---------------+---------------+---------------+-----------------------
10001 |2 |French |Accountant
10001 |2 |French |Senior Python Developer
10001 |2 |French |Accountant
10001 |2 |French |Senior Python Developer
and the plan is:
2025-06-26T22:17:53,468][DEBUG][o.e.x.e.s.EsqlSession ] [node-1] Optimized logicalPlan plan:
EsqlProject[[emp_no{f}#87, languages{f}#83, language_name{r}#92, job_positions{r}#93]]
\_Enrich[ANY,lang_r[KEYWORD],language_code{r}#74,{"match":{"indices":[],"match_field":"language_code","enrich_fields":["la
nguage_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#92]]
\_Limit[10000[INTEGER],true]
\_MvExpand[job_positions{f}#88,job_positions{r}#93]
\_Project[[emp_no{f}#87, job_positions{f}#88, languages{f}#83, languages.byte{f}#84, languages.long{f}#86, languages.sho
rt{f}#85, languages{f}#83 AS language_code#74]]
\_TopN[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER]]
\_EsRelation[employees,remote1:employees][emp_no{f}#87, job_positions{f}#88, languages{f}#83, ..]
[2025-06-26T22:17:53,469][DEBUG][o.e.x.e.s.EsqlSession ] [node-1] Physical plan:
ProjectExec[[emp_no{f}#87, languages{f}#83, language_name{r}#92, job_positions{r}#93]]
\_EnrichExec[ANY,match,language_code{r}#74,lang_r,language_code,{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-172791349
2133},[language_name{r}#92]]
\_LimitExec[10000[INTEGER],null]
\_MvExpandExec[job_positions{f}#88,job_positions{r}#93]
\_ProjectExec[[emp_no{f}#87, job_positions{f}#88, languages{f}#83, languages.byte{f}#84, languages.long{f}#86, languages.sho
rt{f}#85, languages{f}#83 AS language_code#74]]
\_TopNExec[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER],null]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
TopN[[Order[emp_no{f}#87,ASC,LAST]],2[INTEGER]]
\_EsRelation[employees,remote1:employees][emp_no{f}#87, job_positions{f}#88, languages{f}#83, ..]<>]]
As we can see, expansion and limits happen on coordinator side. Changing the query to remote ENRICH:
FROM employees, remote1:employees | SORT emp_no | LIMIT 2 | EVAL language_code = languages | MV_EXPAND job_positions | ENRICH _remote:lang_r | KEEP emp_no, languages, language_name, job_positions
produces:
emp_no | languages | language_name | job_positions
---------------+---------------+---------------+-----------------------
10001 |2 |French |Senior Python Developer
10001 |2 |French |Accountant
With the plan:
2025-06-26T22:16:39,677][DEBUG][o.e.x.e.s.EsqlSession ] [node-1] Optimized logicalPlan plan:
EsqlProject[[emp_no{f}#41, languages{f}#37, language_name{r}#46, job_positions{r}#47]]
\_Enrich[REMOTE,lang_r[KEYWORD],language_code{r}#28,{"match":{"indices":[],"match_field":"language_code","enrich_fields":[
"language_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#46]]
\_Limit[10000[INTEGER],true]
\_MvExpand[job_positions{f}#42,job_positions{r}#47]
\_Project[[emp_no{f}#41, job_positions{f}#42, languages{f}#37, languages.byte{f}#38, languages.long{f}#40, languages.sho
rt{f}#39, languages{f}#37 AS language_code#28]]
\_TopN[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER]]
\_EsRelation[employees,remote1:employees][emp_no{f}#41, job_positions{f}#42, languages{f}#37, ..]
[2025-06-26T22:16:39,678][DEBUG][o.e.x.e.s.EsqlSession ] [node-1] Physical plan:
ProjectExec[[emp_no{f}#41, languages{f}#37, language_name{r}#46, job_positions{r}#47]]
\_LimitExec[10000[INTEGER],null]
\_TopNExec[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER],null]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,lang_r[KEYWORD],language_code{r}#28,{"match":{"indices":[],"match_field":"language_code","enrich_fields":[
"language_name"]}},{=.enrich-lang_r-1728002375328, remote1=.enrich-lang_r-1727913492133},[language_name{r}#46]]
\_Limit[10000[INTEGER],true]
\_MvExpand[job_positions{f}#42,job_positions{r}#47]
\_Project[[emp_no{f}#41, job_positions{f}#42, languages{f}#37, languages.byte{f}#38, languages.long{f}#40, languages.sho
rt{f}#39, languages{f}#37 AS language_code#28]]
\_TopN[[Order[emp_no{f}#41,ASC,LAST]],2[INTEGER]]
\_EsRelation[employees,remote1:employees][emp_no{f}#41, job_positions{f}#42, languages{f}#37, ..]<>]]
We can see that the expansion is moved to the remote side and the TopNExec applied to both sides, as it usually is for remote computation, but the semantics is wrong here as it is applying to the result of the MvExpand operation, not it's input.
I think this would be a problem in general with LIMITs and cardinality-changing operations, since if we run computations remotely (either on another cluster or even on another node) we would not be able to properly select necessary amount of rows - if the query says LIMIT n
this limit is supposed to be applied to all aggregate data to all clusters, and if there's any cardinality-changing operation after that, then the limit can not be applied after it. This probably means all such computations need to be forced on coordinator and can not be compatible with things like remote ENRICH. Though I am not sure yet how best to recognize all such cases.