Skip to content

Stream result pages from sub plans for FORK #126705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 16, 2025

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ protected Page process(Page page) {
for (int i = 0; i < page.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
}

return page.projectBlocks(projections);
try {
return page.projectBlocks(projections);
} finally {
page.releaseBlocks();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related?

Copy link
Contributor Author

@ioanatia ioanatia Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - when we changed the implementation of FORK, the RRF tests started failing.
In the tests RRF always follows FORK.
Example of a failure:

    java.lang.AssertionError: circuit breakers not reset to 0Expected a map containing
    estimated_size_in_bytes: expected <0> but was <8576>
             estimated_size: expected "0b" but was "8.3kb"
                   overhead: <1.0> unexpected but ok
        limit_size_in_bytes: <322122547> unexpected but ok
                 limit_size: "307.1mb" unexpected but ok
                    tripped: <0> unexpected but ok
        at org.elasticsearch.test.MapMatcher.assertMap(MapMatcher.java:85)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.lambda$assertRequestBreakerEmpty$0(EsqlSpecTestCase.java:429)
        at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1519)
        at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1491)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmpty(EsqlSpecTestCase.java:421)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmptyAfterTests(EsqlSpecTestCase.java:417)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)

Previously the pages that were processed by the RrfScoreEvalOperator were coming from the pages that were stored in LocalSourceExec and that were representing the sub plan results that we set in EsqlSession.
Now the pages are coming from an ExchangeSource.
Once I made this change to release the blocks, the tests started working again.

}
}

@Override
Expand Down
20 changes: 10 additions & 10 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

simpleFork
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE emp_no == 10001 )
Expand All @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
;

forkWithWhereSortAndLimit
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
Expand All @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
;

fiveFork
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE emp_no == 10005 )
Expand All @@ -59,7 +59,7 @@ fork5 | 10001
;

forkWithWhereSortDescAndLimit
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
Expand All @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
;

forkWithCommonPrefilter
required_capability: fork
required_capability: fork_v3

FROM employees
| WHERE emp_no > 10050
Expand All @@ -94,7 +94,7 @@ fork2 | 10100
;

forkWithSemanticSearchAndScore
required_capability: fork
required_capability: fork_v3
required_capability: semantic_text_field_caps
required_capability: metadata_score

Expand All @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
;

forkWithEvals
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
Expand All @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
;

forkWithStats
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
Expand All @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
;

forkWithDissect
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
;

forkWithMixOfCommands
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ FROM books METADATA _id, _index, _score
( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3)
| RRF
| STATS count_fork=COUNT(*) BY _fork
| SORT _fork
;

count_fork:long | _fork:keyword
Expand Down Expand Up @@ -120,6 +121,7 @@ FROM semantic_text METADATA _id, _score, _index
( WHERE semantic_text_field:"something else" | SORT _score DESC | LIMIT 2)
| RRF
| EVAL _score = round(_score, 4)
| EVAL _fork = mv_sort(_fork)
| KEEP _fork, _score, _id, semantic_text_field
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ private void testSimpleImpl(String query) {
}
}

public void testRow() {
var query = """
ROW a = [1, 2, 3, 4], b = 100
| MV_EXPAND a
| FORK (WHERE a % 2 == 1)
(WHERE a % 2 == 0)
| SORT _fork, a
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
assertColumnTypes(resp.columns(), List.of("integer", "integer", "keyword"));

Iterable<Iterable<Object>> expectedValues = List.of(
List.of(1, 100, "fork1"),
List.of(3, 100, "fork1"),
List.of(2, 100, "fork2"),
List.of(4, 100, "fork2")
);
assertValues(resp.values(), expectedValues);
}
}

public void testSortAndLimitInFirstSubQuery() {
var query = """
FROM test
Expand Down Expand Up @@ -216,13 +239,15 @@ public void testWhereSortOnlyInFork() {
( WHERE content:"fox" | SORT id )
( WHERE content:"dog" | SORT id )
| KEEP _fork, id, content
| SORT _fork, id
""";
var queryWithMatchFunction = """
FROM test
| FORK
( WHERE match(content, "fox") | SORT id )
( WHERE match(content, "dog") | SORT id )
| KEEP _fork, id, content
| SORT _fork, id
""";
for (var query : List.of(queryWithMatchOperator, queryWithMatchFunction)) {
try (var resp = run(query)) {
Expand Down Expand Up @@ -509,6 +534,7 @@ public void testWithEvalSimple() {
| FORK ( EVAL a = 1 )
( EVAL a = 2 )
| KEEP a, _fork, id, content
| SORT _fork
""";

try (var resp = run(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testRrf() {
( WHERE content:"fox" | SORT _score, _id DESC )
( WHERE content:"dog" | SORT _score, _id DESC )
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 4)
| KEEP id, content, _score, _fork
""";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,9 @@ public enum Cap {
MAX_OVER_TIME(Build.current().isSnapshot()),

/**
* Support STATS/EVAL/DISSECT in Fork branches
* Support streaming of sub plan results
*/
FORK_V2(Build.current().isSnapshot()),
FORK_V3(Build.current().isSnapshot()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems obvious now, but I like how this capability can be effectively incremented (without affecting previous releases and or over excessively polluting). This could be a good pattern to socialise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this approach with lookup join and it made sense to reuse here.


/**
* Support for the {@code leading_zeros} named parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand All @@ -45,6 +46,10 @@ public PhysicalPlan apply(PhysicalPlan plan) {

// This will require updating should we choose to have non-unary execution plans in the future.
return plan.transformDown(currentPlanNode -> {
if (currentPlanNode instanceof MergeExec) {
keepTraversing.set(FALSE);
}

if (keepTraversing.get() == false) {
return currentPlanNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.compute.operator.LimitOperator;
import org.elasticsearch.compute.operator.LocalSourceOperator;
import org.elasticsearch.compute.operator.LocalSourceOperator.LocalSourceFactory;
import org.elasticsearch.compute.operator.MergeOperator;
import org.elasticsearch.compute.operator.MvExpandOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
Expand Down Expand Up @@ -103,7 +102,6 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand Down Expand Up @@ -281,8 +279,6 @@ else if (node instanceof OutputExec outputExec) {
return planOutput(outputExec, context);
} else if (node instanceof ExchangeSinkExec exchangeSink) {
return planExchangeSink(exchangeSink, context);
} else if (node instanceof MergeExec mergeExec) {
return planMerge(mergeExec, context);
} else if (node instanceof RrfScoreEvalExec rrf) {
return planRrfScoreEvalExec(rrf, context);
}
Expand Down Expand Up @@ -804,13 +800,6 @@ private PhysicalOperation planChangePoint(ChangePointExec changePoint, LocalExec
);
}

private PhysicalOperation planMerge(MergeExec mergeExec, LocalExecutionPlannerContext context) {
Layout.Builder layout = new Layout.Builder();
layout.append(mergeExec.output());
MergeOperator.BlockSuppliers suppliers = () -> mergeExec.suppliers().stream().map(s -> s.get()).toList();
return PhysicalOperation.fromSource(new MergeOperator.MergeOperatorFactory(suppliers), layout.build());
}

/**
* Immutable physical operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
Expand All @@ -67,6 +69,34 @@

public class PlannerUtils {

/**
* When the plan contains children like {@code MergeExec} resulted from the planning of commands such as FORK,
* we need to break the plan into sub plans and a main coordinator plan.
* The result pages from each sub plan will be funneled to the main coordinator plan.
* To achieve this, we wire each sub plan with a {@code ExchangeSinkExec} and add a {@code ExchangeSourceExec}
* to the main coordinator plan.
* There is an additional split of each sub plan into a data node plan and coordinator plan.
* This split is not done here, but as part of {@code PlannerUtils#breakPlanBetweenCoordinatorAndDataNode}.
*/
public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a short javadoc description.

var subplans = new Holder<List<PhysicalPlan>>();
PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> {
subplans.set(me.children().stream().map(child -> {
// TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation
child = child.transformUp(FragmentExec.class, f -> {
var logicalFragment = f.fragment();
logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output());
return new FragmentExec(logicalFragment);
});

return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child);
}).toList());
return new ExchangeSourceExec(me.source(), me.output(), false);
});

return new Tuple<>(subplans.get(), mainPlan);
}

public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode(PhysicalPlan plan, Configuration config) {
var dataNodePlan = new Holder<PhysicalPlan>();

Expand Down
Loading