Skip to content

Stream result pages from sub plans for FORK #126705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 16, 2025
Merged

Conversation

ioanatia
Copy link
Contributor

@ioanatia ioanatia commented Apr 11, 2025

tracked in #126389

We move away from the INLINESTATS execution model for FORK and instead FORK is present, we break the physical plan into sub plans and a main coordinator plan at the ComputeService level.
The sub plans are further broken down into an (optional) data node plan and coordinator plan.

To funnel pages between the sub plans and the main coordinator plan we use ExchangeSink/ExchangeSource.
Take as an example the following query:

FROM test
| FORK
    ( WHERE content:"fox" ) // sub plan 1
    ( WHERE content:"dog" ) // sub plan 2
| SORT _fork
| KEEP _fork, id, content

The execution will be split into the following plans:

Screenshot 2025-04-07 at 11 26 00

I also removed the MergeOperator since it was no longer needed.
Previously the Fork logical plan would be translated to MergeExec which would be planned using MergeOperator.
MergeOperator would simply funnel the pages from the sub plans.

@ioanatia ioanatia added Team:Search Relevance Meta label for the Search Relevance team in Elasticsearch :Search Relevance/Search Catch all for Search Relevance v9.1.0 labels Apr 11, 2025
Copy link
Contributor

@ChrisHegarty ChrisHegarty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worked out very clean. If there is any possibility to refactor common code snippets into smaller functions that would be good to try, but otherwise I think that it in great shape.

*/
FORK_V2(Build.current().isSnapshot()),
FORK_V3(Build.current().isSnapshot()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems obvious now, but I like how this capability can be effectively incremented (without affecting previous releases and or over excessively polluting). This could be a good pattern to socialise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this approach with lookup join and it made sense to reuse here.

@@ -67,6 +69,25 @@

public class PlannerUtils {

public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a short javadoc description.

@ioanatia ioanatia requested review from dnhatn and nik9000 April 15, 2025 11:11
@ioanatia ioanatia marked this pull request as ready for review April 15, 2025 11:11
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I like it. Probably worth having @costin have a look at too.

try {
return page.projectBlocks(projections);
} finally {
page.releaseBlocks();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related?

Copy link
Contributor Author

@ioanatia ioanatia Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - when we changed the implementation of FORK, the RRF tests started failing.
In the tests RRF always follows FORK.
Example of a failure:

    java.lang.AssertionError: circuit breakers not reset to 0Expected a map containing
    estimated_size_in_bytes: expected <0> but was <8576>
             estimated_size: expected "0b" but was "8.3kb"
                   overhead: <1.0> unexpected but ok
        limit_size_in_bytes: <322122547> unexpected but ok
                 limit_size: "307.1mb" unexpected but ok
                    tripped: <0> unexpected but ok
        at org.elasticsearch.test.MapMatcher.assertMap(MapMatcher.java:85)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.lambda$assertRequestBreakerEmpty$0(EsqlSpecTestCase.java:429)
        at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1519)
        at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1491)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmpty(EsqlSpecTestCase.java:421)
        at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmptyAfterTests(EsqlSpecTestCase.java:417)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)

Previously the pages that were processed by the RrfScoreEvalOperator were coming from the pages that were stored in LocalSourceExec and that were representing the sub plan results that we set in EsqlSession.
Now the pages are coming from an ExchangeSource.
Once I made this change to release the blocks, the tests started working again.

@dnhatn
Copy link
Member

dnhatn commented Apr 15, 2025

I wonder if we should break into sub-plans on data nodes instead to avoid these two issues:

  1. Reader contexts: Currently, sub-plans can be executed with different reader contexts, which might lead to inconsistent results between forks.

  2. Overhead: Splitting sub-plans on the coordinator and executing each sub-plan on every cluster separately, including remote clusters, can result in significant overhead.

@ioanatia
Copy link
Contributor Author

ioanatia commented Apr 15, 2025

@dnhatn

I wonder if we should break into sub-plans on data nodes instead to avoid these two issues:

One reason why it was done this way was because each fork branch would actually need to be split into a data node plan and a coordinator plan.

For example - the following branches are quite different but both have a data node + coordinator plans:

FROM employees METADATA _score
| WHERE match(first_name, "John")
| FORK
       (SORT _score DESC | LIMIT 10) // keep just the 10 top hits
       (STATS total = COUNT(*)) // return the total

We also add a default LIMIT to each FORK branch - which I guess means that each FORK branch would automatically have a coordinator plan that will need to cap the number of results to the given LIMIT.

Reader contexts: Currently, sub-plans can be executed with different reader contexts, which might lead to inconsistent results between forks.

If we create and use a point in time, would this solve the inconsistency issue?
Retrievers in _search are somehow similar, they issue multiple queries to the shards and use PIT.
We have PIT as a requirement for FORK, but not necessarily for tech preview.

@nik9000
Copy link
Member

nik9000 commented Apr 15, 2025

I wonder if we should break into sub-plans on data nodes instead to avoid these two issues:

One reason why it was done this way was because each fork branch would actually need to be split into a data node plan and a coordinator plan.

Right. I imagined we were doing it this way because we want to support, eventually, stuff like

FORK
  (FROM a | WHERE foo == 1)
  (FROM b | WHERE bar == 1)
| STATS whatever

The real robot legs of it all.

If we create and use a point in time, would this solve the inconsistency issue?
Retrievers in _search are somehow similar, they issue multiple queries to the shards and use PIT.
We have PIT as a requirement for FORK, but not necessarily for tech preview.

PIT is sort of the public facing version of what _search does. Either way it amounts to using the same IndexSearcher for both operations. _search needs it to be able to fetch at all.

There's a real tension between getting a consistent view and analytic style queries that hit a zillion indices. If you are hitting 09832104983214 indices we just don't have the file handles to keep all the readers open all the time. Right now we batch it to a hand full of shard copies per node. Getting a consistent view across both "legs" is doing got require.... Something.

This probably matters a lot for you too - to do a real fetch phase, like, as a second pass from the coordinator node like _search does, means you need to leave the IndexSearchers open. You really want to close the ones that don't return any documents. And close the ones who don't have documents in the top n.

Maybe the best thing to do is to break the plans apart like this on the coordinator but send both plans down at once to the data node and start them using the same IndexSearchers. That feels like it wouldn't be that complex.

I'm unsure if now is the right time for it. A plan like the one you've got is the best we're going to get when the indices are separate.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see a new test in single_node.RestEsqlIT that checks on the output of the profile. It'd be super duper nice if each of the forked drivers could identify themselves in the profile output.

Ideally we'd also have a test for the tasks output of this too. Like in EsqlActionTaskIT - but it's quite a bit more painful because you have to block execution. On the other hand, if we're going to fork stuff it'd be super mega useful to have trustworthy debug information from tasks and profile.

@nik9000
Copy link
Member

nik9000 commented Apr 15, 2025

I'd love to see a new test in single_node.RestEsqlIT that checks on the output of the profile. It'd be super duper nice if each of the forked drivers could identify themselves in the profile output.

Ideally we'd also have a test for the tasks output of this too. Like in EsqlActionTaskIT - but it's quite a bit more painful because you have to block execution. On the other hand, if we're going to fork stuff it'd be super mega useful to have trustworthy debug information from tasks and profile.

I'm aware this is several more days of work, but I think it's pretty important.

@dnhatn
Copy link
Member

dnhatn commented Apr 15, 2025

Maybe the best thing to do is to break the plans apart like this on the coordinator but send both plans down at once to the data node and start them using the same IndexSearchers. That feels like it wouldn't be that complex.

++ I think we can make a fragment contain multiple sub-plans and execute them using the same shard copies.

@dnhatn
Copy link
Member

dnhatn commented Apr 16, 2025

An alternative is to pass a list of plans to DataNodeRequest and ClusterComputeRequest.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ioanatia Since I am working on something similar for time-series, you can merge this as is, and I will make the changes to dispatch multiple plans in a single data-node and cluster request.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-relevance (Team:Search Relevance)

);

exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource);
try (var ignored = mainExchangeSource.addEmptySink()) {
Copy link
Contributor Author

@ioanatia ioanatia Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to one last change to get this working.

We had a few failures in CI that I could not replicate locally, that were suggesting the exchange source was closing too early/not receiving all pages.

A failure example:

java.lang.AssertionError: Expected more data but no more entries found after [2] |  
...
Actual: |  
-- | --
  | emp_no:integer \| first_name:keyword \| _fork:keyword |  
  | 10009          \| Sumant             \| fork1 |  
  | 10048          \| Florian            \| fork1 |  
  |   |  
  | Expected: |  
  | emp_no:integer \| first_name:keyword \| _fork:keyword |  
  | 10002          \| Bezalel            \| fork2 |  
  | 10009          \| Sumant             \| fork1 |  
  | 10014          \| Berni              \| fork2 |  
  | 10048          \| Florian            \| fork1 |  
  | 10058          \| Berhard            \| fork2 |  
  | 10060          \| Breannda           \| fork2 |  
  | 10094          \| Arumugam           \| fork2

this meant the results for one fork branch did not make it in the exchange source.
to fix this I looked at the existing pattern in ComputeService where on the exchange source we add an empty sink. Luckily the java doc explains why we need an empty sync:

/**
* Links this exchange source with an empty/dummy remote sink. The purpose of this is to prevent this exchange source from finishing
* until we have performed other async actions, such as linking actual remote sinks.
*
* @return a Releasable that should be called when the caller no longer needs to prevent the exchange source from completing.
*/
public Releasable addEmptySink() {
outstandingSinks.trackNewInstance();
return outstandingSinks::finishInstance;
}

so all I did was to wrap everything in a try (var ignored = mainExchangeSource.addEmptySink()) {} and also made sure that we execute the main plan first, followed by the sub plans.
This is the exact pattern we have in ComputeService#executePlan.

@ioanatia
Copy link
Contributor Author

I'd love to see a new test in single_node.RestEsqlIT that checks on the output of the profile. It'd be super duper nice if each of the forked drivers could identify themselves in the profile output.

@nik I agree it's super important - we don't have this at the moment - just checked the output and it's not what we want.
This is now added as a follow up in: #121950

Which means we commit to have proper profile information as a requirement for a tech preview release - we don't release without it.

Copy link
Contributor

@ChrisHegarty ChrisHegarty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still LGTM.

@ioanatia ioanatia merged commit 5a6509a into elastic:main Apr 16, 2025
17 checks passed
@ioanatia ioanatia deleted the fork_streaming branch April 16, 2025 12:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>non-issue :Search Relevance/Search Catch all for Search Relevance Team:Search Relevance Meta label for the Search Relevance team in Elasticsearch v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants