Description
tracked in #121950
Currently FORK uses the same execution model as INLINESTATS where each FORK branch is executed one by one.
The result of each FORK branch is then stored into a LocalSourceExec
which replaces the FORK branch in the main physical plan.
After all the branches have been executed and their result stored in the main plan, the main plan is executed as well.
This model has a few limitations including a limitation on the max size for each FORK branch result.
However the biggest issue is that we do not take advantage of the existing infrastructure on the compute service that allows pages to "flow" between the data node plans and the main coordinator plan.
This is an illustrative example of how FORK could be modelled to do streaming of pages.
Take a simple FORK query:
FROM test
| FORK
( WHERE content:"fox" ) // sub plan 1
( WHERE content:"dog" ) // sub plan 2
| SORT _fork
| KEEP _fork, id, content
The physical plan for the previous query will be split into a list of sub plans and a main coordinator plan.
Each sub plan will also be split into a data node plan and coordinator plan.

Each plan will have the required ExchangeSinkExec
when they have to sink data to another plan and a ExchangeSourceExec
when they have to pull pages from another plan.
Detailed planning
The initial physical plan:
ProjectExec[[_fork{r}#2, id{f}#9, content{f}#10]]
\_TopNExec[[Order[_fork{r}#2,ASC,LAST]],1000[INTEGER],104]
\_MergeExec[[content{f}#10, id{f}#9, _fork{r}#2]]
|_EvalExec[[[66 6f 72 6b 31][KEYWORD] AS _fork]]
| \_LimitExec[1000[INTEGER]]
| \_ExchangeExec[[id{f}#9, content{f}#10],false]
| \_FragmentExec[filter=null, estimatedRowSize=50, reducer=[], fragment=[<>
\_Limit[1000[INTEGER],false]
\_Filter[:(content{f}#10,[66 6f 78][KEYWORD])]
\_EsRelation[test][content{f}#10, id{f}#9]<>]]
\_EvalExec[[[66 6f 72 6b 32][KEYWORD] AS _fork]]
\_LimitExec[1000[INTEGER]]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=50, reducer=[], fragment=[<>
Limit[1000[INTEGER],false]
\_Filter[:(content{f}#10,[64 6f 67][KEYWORD])]
\_EsRelation[test][content{f}#10, id{f}#9]<>]]
The main physical plan would be split into sub plans and a main coordinator plan.
The main coordinator plan:
OutputExec
\_ProjectExec[[_fork{r}#2, id{f}#9, content{f}#10]]
\_TopNExec[[Order[_fork{r}#2,ASC,LAST]],1000[INTEGER],104]
\_ExchangeSourceExec[[content{f}#10, id{f}#9, _fork{r}#2],false]
A sub plan:
ExchangeSinkExec[[id{f}#9, content{f}#10, _fork{r}#2],false]
\_EvalExec[[[66 6f 72 6b 31][KEYWORD] AS _fork]]
\_LimitExec[1000[INTEGER]]
\_ExchangeExec[[id{f}#9, content{f}#10],false]
\_FragmentExec[filter=null, estimatedRowSize=50, reducer=[], fragment=[<>
\_Limit[1000[INTEGER],false]
\_Filter[:(content{f}#10,[66 6f 78][KEYWORD])]
\_EsRelation[test][content{f}#10, id{f}#9]<>]]
Each sub plan will be further split between a data node plan and a coordinator plan.
The data plan for sub plan 1:
ExchangeSinkExec[[id{f}#9, content{f}#10],false]
\_FragmentExec[filter=null, estimatedRowSize=50, reducer=[], fragment=[<>
\_Limit[1000[INTEGER],false]
\_Filter[:(content{f}#10,[66 6f 78][KEYWORD])]
\_EsRelation[test][content{f}#10, id{f}#9]<>]]
The coordinator plan for sub plan 1:
ExchangeSinkExec[[id{f}#9, content{f}#10, _fork{r}#2],false]
\_EvalExec[[[66 6f 72 6b 31][KEYWORD] AS _fork]]
\_LimitExec[1000[INTEGER]]
\_ExchangeSourceExec[[id{f}#9, content{f}#10],false]