-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Stream result pages from sub plans for FORK #126705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bfb9dee
c209f02
07a0b20
0ff3676
7b0aaa3
753632d
768c581
0d83c20
b6ecc10
62902e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -983,9 +983,9 @@ public enum Cap { | |
MAX_OVER_TIME(Build.current().isSnapshot()), | ||
|
||
/** | ||
* Support STATS/EVAL/DISSECT in Fork branches | ||
* Support streaming of sub plan results | ||
*/ | ||
FORK_V2(Build.current().isSnapshot()), | ||
FORK_V3(Build.current().isSnapshot()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems obvious now, but I like how this capability can be effectively incremented (without affecting previous releases and or over excessively polluting). This could be a good pattern to socialise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed this approach with lookup join and it made sense to reuse here. |
||
|
||
/** | ||
* Support for the {@code leading_zeros} named parameter. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,13 +36,15 @@ | |
import org.elasticsearch.xpack.esql.plan.QueryPlan; | ||
import org.elasticsearch.xpack.esql.plan.logical.EsRelation; | ||
import org.elasticsearch.xpack.esql.plan.logical.Filter; | ||
import org.elasticsearch.xpack.esql.plan.logical.Project; | ||
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; | ||
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.MergeExec; | ||
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; | ||
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; | ||
import org.elasticsearch.xpack.esql.planner.mapper.Mapper; | ||
|
@@ -67,6 +69,34 @@ | |
|
||
public class PlannerUtils { | ||
|
||
/** | ||
* When the plan contains children like {@code MergeExec} resulted from the planning of commands such as FORK, | ||
* we need to break the plan into sub plans and a main coordinator plan. | ||
* The result pages from each sub plan will be funneled to the main coordinator plan. | ||
* To achieve this, we wire each sub plan with a {@code ExchangeSinkExec} and add a {@code ExchangeSourceExec} | ||
* to the main coordinator plan. | ||
* There is an additional split of each sub plan into a data node plan and coordinator plan. | ||
* This split is not done here, but as part of {@code PlannerUtils#breakPlanBetweenCoordinatorAndDataNode}. | ||
*/ | ||
public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add a short javadoc description. |
||
var subplans = new Holder<List<PhysicalPlan>>(); | ||
PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> { | ||
subplans.set(me.children().stream().map(child -> { | ||
// TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation | ||
child = child.transformUp(FragmentExec.class, f -> { | ||
var logicalFragment = f.fragment(); | ||
logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output()); | ||
return new FragmentExec(logicalFragment); | ||
}); | ||
|
||
return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child); | ||
}).toList()); | ||
return new ExchangeSourceExec(me.source(), me.output(), false); | ||
}); | ||
|
||
return new Tuple<>(subplans.get(), mainPlan); | ||
} | ||
|
||
public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode(PhysicalPlan plan, Configuration config) { | ||
var dataNodePlan = new Holder<PhysicalPlan>(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - when we changed the implementation of FORK, the RRF tests started failing.
In the tests RRF always follows FORK.
Example of a failure:
Previously the pages that were processed by the
RrfScoreEvalOperator
were coming from the pages that were stored inLocalSourceExec
and that were representing the sub plan results that we set inEsqlSession
.Now the pages are coming from an
ExchangeSource
.Once I made this change to release the blocks, the tests started working again.