-
Notifications
You must be signed in to change notification settings - Fork 11.7k
[12.x] Fix PendingRequest@pool() && batch() concurrency
#57973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 12.x
Are you sure you want to change the base?
Conversation
Updated the default value of the concurrency parameter in the pool method to 0 for concurrent execution.
|
If this is really the case we should just make it |
@taylorotwell for the test you provided here #57972 (comment), does changing the concurrency to anything have an effect on the outcome? It doesn't for me... 😕 I'm thinking that the implementation in Pool is either incorrect or not what I expect. The creation of EachPromise just determines how many requests to start simultaneously, but it's not waiting for the promise to resolve before carrying on to the next set. I guess I had it in my head that pool() would only have $concurrent number of batches outstanding at a time. |
|
|
||
| if ($this->async) { | ||
| return $this->makePromise($method, $url, $options); | ||
| return $this->promise = new FluentPromise(fn () => $this->makePromise($method, $url, $options)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fought long and hard with Codex about this. I mean, I was trying to get this damned LLM to admit it was wrong, but alas, I had to eat crow.
So, when we call Client@requestAsync() it is telling the underlying curl multihandler about the request we're going to make. All of the requests always get fired off by the handler as soon as we run the first promise, making the $concurrent parameter all but meaningless inside of PendingRequest@pool() since they'll the curl multihandler initiates everything in its queue when we first the first request.
PendingRequest@pool()PendingRequest@pool() concurrency
| if ($concurrency === null) { | ||
| (new Collection($requests))->each(function ($item) { | ||
| if ($item instanceof static) { | ||
| $item = $item->getPromise(); | ||
| } | ||
| if ($item instanceof LazyPromise) { | ||
| $item->buildPromise(); | ||
| } | ||
| }); | ||
| foreach ($requests as $key => $item) { | ||
| $results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait(); | ||
| } | ||
|
|
||
| return $results; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this for backwards compatibility: so null executes everything at once as it always had, throwing an exception if the user marked throws() or there's a connection timeout.
I would like to see this entirely removed in Laravel 13.
To me, it makes no sense in the context of request pooling, and it changes the underlying behavior. Here, we throw an exception on a connection timeout, whereas in any other case, we return the ConnectionException as a part of the array.
| * @return array<array-key, \Illuminate\Http\Client\Response|\Illuminate\Http\Client\ConnectionException|\Illuminate\Http\Client\RequestException> | ||
| */ | ||
| public function pool(callable $callback, ?int $concurrency = null) | ||
| public function pool(callable $callback, ?int $concurrency = 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@taylorotwell the change to 0 for the default probably breaks existing code and maybe we should just forego it until Laravel 13.
| $this->guzzlePromise = call_user_func($this->promiseBuilder); | ||
|
|
||
| foreach ($this->pending as $pendingCallback) { | ||
| $pendingCallback($this->guzzlePromise); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could possibly merge the behavior of FluentPromise with this by doing something like:
$result = $pendingCallback($this->guzzlePromise);
if ($result instanceof PromiseInterface) {
$this->guzzlePromise = $result;
}|
@WendellAdriel do you mind taking a look at the changes to Batch? 🙇 |
|
I read |
That's a good point. But I don't the original implementation works that way either. Not sure how to make this work so the queue only has N number of grouped requests. 🤔 For a use case I have in mind, I wouldn't want to exhaust API rate limits or get an IP blacklisted for firing off 20 requests at the same time. |
|
Do we know for certain how the current implementation works? 👀 It definitely doesn't seem to be totally serial but also doesn't seem to respect the concurrency value either. I only usually see 2-3 concurrent requests at once regardless of the concurrency value. |
|
The definition of concurrency is what Taylor said. It's weird, because some tests I ran during the PR it seemed to be respecting the concurrency. It's using the concurrency option from Guzzle under the hood, so I'd expect this to work |
|
@WendellAdriel maybe you can dig into the current code and see what you see on your end? Maybe we are missing something? |
|
@taylorotwell sure, let me run some tests on my end and try to figure out what's happening! I'll report back here ASAP |
For whatever it's worth, I added a log inside of the CurlMultiHandler@addRequest method. From what I can see, all of the Pooled or Batched requests get added to the handler at once in the current implementation of these functions. Possibly a good test would be to pool 100 requests, but only await the first promise in Pool. See if 100 requests hit the local endpoint. |
|
@taylorotwell @cosmastech I think I found the issue. |
|
Here is some evidence that this dispatches everything at once Run the node.js script in the original PR description. Then apply this diff patch against the Here's a screen recording as well multi-failure.movAnother possibility is that we set a new multi-handler for each group of requests. You'd have to pass the |
@taylorotwell are you running against Herd or possibly valet? It looks like the FPM config confounds the results. |
|
@taylorotwell @cosmastech as I mentioned, I think I found the root cause of the issue and how to fix it and opened this PR: #57977 so you can take a look at how it can be fixed. I added there because it would be easier/cleaner to show the fix. |
@WendellAdriel I think both of these implementations functionally are the same. I am not positive that using the generator makes a meaningful difference in how this executes. That said, I think I have lost the plot a bit here. What is the desired behavior? Release The current implementation dispatches everything regardless of concurrency limit. The cause is that Guzzle's CurlMultiHandler is shared between all requests. As soon as an async request is built (via The approach in this PR (and Wendell's too I think) is only adding new requests once the previous ones have been settled, so the multi-handler doesn't dispatch them simultaneously, which is not the desired outcome. If neither one of these approaches is the desired behavior, then I am not positive what the solution is. The things I haven't tried yet:
|
That's why in my PR I create a proxy object, the
No, in my implementation I pass all requests at once to The root issue was that the |
|
Ahhh, so I modified my test to put in variable sleep times, and this is working as you had described Taylor: Node Serverimport http from 'http'
const PORT = process.env.PORT || 4000;
const server = http.createServer(async (req, res) => {
const url = new URL(req.url, `http://${req.headers.host}`);
if (url.pathname === '/test') {
const run = url.searchParams.get('run') ?? 'unknown';
const sleepTime = parseInt(url.searchParams.get('sleep') ?? 4);
const start = new Date().toISOString();
console.log(`[${start}] received run=${run} sleeping for: ${sleepTime}`);
// Simulate work
await new Promise(resolve => setTimeout(resolve, sleepTime * 1000));
const done = new Date().toISOString();
console.log(`[${done}] finished run=${run}`);
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(String(Math.floor(Math.random() * 900) + 100));
return;
}
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
});
server.listen(PORT, () => {
console.log(`Server listening on http://localhost:${PORT}`);
});And the additional test case: public function test_concurrency()
{
$r = Benchmark::measure(function () {
Http::pool(function (Pool $pool) {
$pool->as('first')
->connectTimeout(99999999)
->get('http://localhost:4000/test?run=1&sleep=9');
$pool
->as('second')
->connectTimeout(99999999)
->get('http://localhost:4000/test?run=2');
$pool
->as('third')
->connectTimeout(99999999)
->get('http://localhost:4000/test?run=3');
$pool
->as('fourth')
->connectTimeout(99999999)
->get('http://localhost:4000/test?run=4');
$pool
->as('fifth')
->connectTimeout(99999999)
->get('http://localhost:4000/test?run=5');
}, 2);
});
dd($r);
}Here's the output: Server listening on http://localhost:4000
[2025-12-01T19:30:35.650Z] received run=1 sleeping for: 9
[2025-12-01T19:30:35.652Z] received run=2 sleeping for: 4
[2025-12-01T19:30:39.653Z] finished run=2
[2025-12-01T19:30:39.663Z] received run=3 sleeping for: 4
[2025-12-01T19:30:43.665Z] finished run=3
[2025-12-01T19:30:43.668Z] received run=4 sleeping for: 4
[2025-12-01T19:30:44.652Z] finished run=1
[2025-12-01T19:30:44.656Z] received run=5 sleeping for: 4
[2025-12-01T19:30:47.670Z] finished run=4
[2025-12-01T19:30:48.657Z] finished run=5As you can see, another request dispatches as soon as the other request completes. (Run 1 is still pending, but because Run 2 has finished, Run 3 is started). |
@WendellAdriel I think my test was just faulty. 😅 I wasn't introducing differing sleep timeouts, so it made it seem like it was waiting for the batch to complete, when they all just were finalizing at the same time. I think both of our approaches are in alignment. I switched to use a generator here as you did in yours 🙇 For both of them, the closure to actually build the request doesn't happen until we're inside the generator. Mine is wrapped in a Promise while yours is built on demand via the DeferredRequest object, but they're effectively the same. I would mentioning that switching from returning a PendingRequest to a DeferredRequest might be considered a breaking change for some implementations, for instance I think this would break: function addDefaults(PendingRequest $pendingRequest): PendingRequest
{
$pendingRequest->withUserAgent('laravel/forever')->connectTimeout(2);
return $pendingRequest;
}
Http::pool(function (Pool $pool) {
for($i = 1; $i < 10; $i++) {
addDefaults($pool->as('request_' . $i))->get('https://fake.dev/api/users/' . $i);
}
});It also might not play as nice with IDE auto-completion, but that could probably be solved with a |
|
@cosmastech nice! I never saw the pool and batch being used as you showed in the example TBH, but I do see that in that case this could cause a breaking change (adding the DeferredRequest). Let's wait on Taylor on it! |
PendingRequest@pool() concurrencyPendingRequest@pool() && batch() concurrency
Fixes
We introduce a
LazyPromisein order to not add all the requests the curl multihandler simultaneously. As it was written originally:PendingRequestset to async that callssend()adds a request to the CurlMultiHandlerNow, once we have the requests for the pool, we batch them by
$concurrencyand only at that time do we add them to the CurlMultiHandler queue.This aligns with my expectation and the Laravel docs that
Http::pool()will only have$concurrencynumber of simultaneous requests in flight.Testing
I added a simple node JS server since using Herd was confounding the results
node-server.js
I then added a test locally like so:
Additional Test
By changing the concurrency parameter passed to
Http::pool(), I was able to see the times move as I would expect.null01251025