-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Stream result pages from sub plans for FORK #126705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This worked out very clean. If there is any possibility to refactor common code snippets into smaller functions that would be good to try, but otherwise I think that it in great shape.
*/ | ||
FORK_V2(Build.current().isSnapshot()), | ||
FORK_V3(Build.current().isSnapshot()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems obvious now, but I like how this capability can be effectively incremented (without affecting previous releases and or over excessively polluting). This could be a good pattern to socialise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this approach with lookup join and it made sense to reuse here.
@@ -67,6 +69,25 @@ | |||
|
|||
public class PlannerUtils { | |||
|
|||
public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a short javadoc description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I like it. Probably worth having @costin have a look at too.
try { | ||
return page.projectBlocks(projections); | ||
} finally { | ||
page.releaseBlocks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - when we changed the implementation of FORK, the RRF tests started failing.
In the tests RRF always follows FORK.
Example of a failure:
java.lang.AssertionError: circuit breakers not reset to 0Expected a map containing
estimated_size_in_bytes: expected <0> but was <8576>
estimated_size: expected "0b" but was "8.3kb"
overhead: <1.0> unexpected but ok
limit_size_in_bytes: <322122547> unexpected but ok
limit_size: "307.1mb" unexpected but ok
tripped: <0> unexpected but ok
at org.elasticsearch.test.MapMatcher.assertMap(MapMatcher.java:85)
at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.lambda$assertRequestBreakerEmpty$0(EsqlSpecTestCase.java:429)
at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1519)
at org.elasticsearch.test.ESTestCase.assertBusy(ESTestCase.java:1491)
at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmpty(EsqlSpecTestCase.java:421)
at org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertRequestBreakerEmptyAfterTests(EsqlSpecTestCase.java:417)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
Previously the pages that were processed by the RrfScoreEvalOperator
were coming from the pages that were stored in LocalSourceExec
and that were representing the sub plan results that we set in EsqlSession
.
Now the pages are coming from an ExchangeSource
.
Once I made this change to release the blocks, the tests started working again.
I wonder if we should break into sub-plans on data nodes instead to avoid these two issues:
|
One reason why it was done this way was because each fork branch would actually need to be split into a data node plan and a coordinator plan. For example - the following branches are quite different but both have a data node + coordinator plans:
We also add a default LIMIT to each FORK branch - which I guess means that each FORK branch would automatically have a coordinator plan that will need to cap the number of results to the given LIMIT.
If we create and use a point in time, would this solve the inconsistency issue? |
Right. I imagined we were doing it this way because we want to support, eventually, stuff like
The real robot legs of it all.
PIT is sort of the public facing version of what _search does. Either way it amounts to using the same There's a real tension between getting a consistent view and analytic style queries that hit a zillion indices. If you are hitting 09832104983214 indices we just don't have the file handles to keep all the readers open all the time. Right now we batch it to a hand full of shard copies per node. Getting a consistent view across both "legs" is doing got require.... Something. This probably matters a lot for you too - to do a real fetch phase, like, as a second pass from the coordinator node like _search does, means you need to leave the Maybe the best thing to do is to break the plans apart like this on the coordinator but send both plans down at once to the data node and start them using the same I'm unsure if now is the right time for it. A plan like the one you've got is the best we're going to get when the indices are separate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love to see a new test in single_node.RestEsqlIT
that checks on the output of the profile
. It'd be super duper nice if each of the forked drivers could identify themselves in the profile
output.
Ideally we'd also have a test for the tasks
output of this too. Like in EsqlActionTaskIT
- but it's quite a bit more painful because you have to block execution. On the other hand, if we're going to fork stuff it'd be super mega useful to have trustworthy debug information from tasks
and profile
.
I'm aware this is several more days of work, but I think it's pretty important. |
++ I think we can make a fragment contain multiple sub-plans and execute them using the same shard copies. |
An alternative is to pass a list of plans to DataNodeRequest and ClusterComputeRequest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ioanatia Since I am working on something similar for time-series, you can merge this as is, and I will make the changes to dispatch multiple plans in a single data-node and cluster request.
Pinging @elastic/es-search-relevance (Team:Search Relevance) |
); | ||
|
||
exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource); | ||
try (var ignored = mainExchangeSource.addEmptySink()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to one last change to get this working.
We had a few failures in CI that I could not replicate locally, that were suggesting the exchange source was closing too early/not receiving all pages.
A failure example:
java.lang.AssertionError: Expected more data but no more entries found after [2] |
...
Actual: |
-- | --
| emp_no:integer \| first_name:keyword \| _fork:keyword |
| 10009 \| Sumant \| fork1 |
| 10048 \| Florian \| fork1 |
| |
| Expected: |
| emp_no:integer \| first_name:keyword \| _fork:keyword |
| 10002 \| Bezalel \| fork2 |
| 10009 \| Sumant \| fork1 |
| 10014 \| Berni \| fork2 |
| 10048 \| Florian \| fork1 |
| 10058 \| Berhard \| fork2 |
| 10060 \| Breannda \| fork2 |
| 10094 \| Arumugam \| fork2
this meant the results for one fork branch did not make it in the exchange source.
to fix this I looked at the existing pattern in ComputeService
where on the exchange source we add an empty sink. Luckily the java doc explains why we need an empty sync:
Lines 288 to 298 in 8c9a091
/** | |
* Links this exchange source with an empty/dummy remote sink. The purpose of this is to prevent this exchange source from finishing | |
* until we have performed other async actions, such as linking actual remote sinks. | |
* | |
* @return a Releasable that should be called when the caller no longer needs to prevent the exchange source from completing. | |
*/ | |
public Releasable addEmptySink() { | |
outstandingSinks.trackNewInstance(); | |
return outstandingSinks::finishInstance; | |
} | |
so all I did was to wrap everything in a try (var ignored = mainExchangeSource.addEmptySink()) {}
and also made sure that we execute the main plan first, followed by the sub plans.
This is the exact pattern we have in ComputeService#executePlan
.
@nik I agree it's super important - we don't have this at the moment - just checked the output and it's not what we want. Which means we commit to have proper profile information as a requirement for a tech preview release - we don't release without it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still LGTM.
tracked in #126389
We move away from the INLINESTATS execution model for FORK and instead FORK is present, we break the physical plan into sub plans and a main coordinator plan at the
ComputeService
level.The sub plans are further broken down into an (optional) data node plan and coordinator plan.
To funnel pages between the sub plans and the main coordinator plan we use
ExchangeSink
/ExchangeSource
.Take as an example the following query:
The execution will be split into the following plans:
I also removed the
MergeOperator
since it was no longer needed.Previously the
Fork
logical plan would be translated toMergeExec
which would be planned usingMergeOperator
.MergeOperator
would simply funnel the pages from the sub plans.