Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/Illuminate/Http/Client/Batch.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public function __construct(?Factory $factory = null)
* Add a request to the batch with a key.
*
* @param string $key
* @return \Illuminate\Http\Client\PendingRequest
* @return \Illuminate\Http\Client\DeferredRequest
*
* @throws \Illuminate\Http\Client\BatchInProgressException
*/
Expand All @@ -147,7 +147,7 @@ public function as(string $key)

$this->incrementPendingRequests();

return $this->requests[$key] = $this->asyncRequest();
return new DeferredRequest($this->requests, $key, $this->factory, $this->handler);
}

/**
Expand Down Expand Up @@ -252,18 +252,19 @@ public function send(): array
}

$results = [];
$promises = [];

foreach ($this->requests as $key => $item) {
$promise = match (true) {
$item instanceof PendingRequest => $item->getPromise(),
default => $item,
if (! empty($this->requests)) {
// Create a generator that yields promises on-demand for proper concurrency control
$promiseGenerator = function () {
foreach ($this->requests as $key => $item) {
yield $key => match (true) {
$item instanceof Closure => $item(),
$item instanceof PendingRequest => $item->getPromise(),
default => $item,
};
}
};

$promises[$key] = $promise;
}

if (! empty($promises)) {
$eachPromiseOptions = [
'fulfilled' => function ($result, $key) use (&$results) {
$results[$key] = $result;
Expand Down Expand Up @@ -311,7 +312,7 @@ public function send(): array
$eachPromiseOptions['concurrency'] = $this->concurrencyLimit;
}

(new EachPromise($promises, $eachPromiseOptions))->promise()->wait();
(new EachPromise($promiseGenerator(), $eachPromiseOptions))->promise()->wait();
}

// Before returning the results, we must ensure that the results are sorted
Expand Down Expand Up @@ -422,7 +423,7 @@ public function getRequests(): array
*
* @param string $method
* @param array $parameters
* @return \Illuminate\Http\Client\PendingRequest|\GuzzleHttp\Promise\Promise
* @return \Illuminate\Http\Client\DeferredRequest
*/
public function __call(string $method, array $parameters)
{
Expand All @@ -432,6 +433,12 @@ public function __call(string $method, array $parameters)

$this->incrementPendingRequests();

return $this->requests[] = $this->asyncRequest()->$method(...$parameters);
// Get the next numeric index and create a DeferredRequest for method chaining
$key = count($this->requests);

$deferred = new DeferredRequest($this->requests, $key, $this->factory, $this->handler);

// Call the method on the DeferredRequest to start accumulating calls
return $deferred->$method(...$parameters);
}
}
85 changes: 85 additions & 0 deletions src/Illuminate/Http/Client/DeferredRequest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace Illuminate\Http\Client;

class DeferredRequest
{
/**
* Reference to the pool/batch requests array.
*
* @var array
*/
protected $requests;

/**
* The key for this request in the pool/batch.
*
* @var string|int
*/
protected $key;

/**
* The factory instance.
*
* @var \Illuminate\Http\Client\Factory
*/
protected $factory;

/**
* The Guzzle handler.
*
* @var callable
*/
protected $handler;

/**
* The accumulated method calls to apply.
*
* @var array<array{method: string, parameters: array}>
*/
protected $methodCalls = [];

/**
* @param array &$requests Reference to the pool/batch requests array
* @param string|int $key The key for this request
* @param \Illuminate\Http\Client\Factory $factory
* @param callable $handler
*/
public function __construct(array &$requests, $key, Factory $factory, callable $handler)
{
$this->requests = &$requests;
$this->key = $key;
$this->factory = $factory;
$this->handler = $handler;
}

/**
* Intercept method calls and store them for deferred execution.
*
* @param string $method
* @param array $parameters
* @return $this
*/
public function __call($method, $parameters)
{
// Accumulate method calls to apply in order
$this->methodCalls[] = ['method' => $method, 'parameters' => $parameters];

// Store a closure that will create and execute the request on-demand with all accumulated calls
$methodCalls = $this->methodCalls;
$factory = $this->factory;
$handler = $this->handler;

$this->requests[$this->key] = function () use ($factory, $handler, $methodCalls) {
$request = $factory->setHandler($handler)->async();

foreach ($methodCalls as $call) {
$request = $request->{$call['method']}(...$call['parameters']);
}

return $request;
};

return $this;
}
}
72 changes: 42 additions & 30 deletions src/Illuminate/Http/Client/PendingRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -896,20 +896,34 @@ public function pool(callable $callback, ?int $concurrency = null)
$requests = tap(new Pool($this->factory), $callback)->getRequests();

if ($concurrency === null) {
$promises = [];
foreach ($requests as $key => $item) {
$results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait();
$promises[$key] = match (true) {
$item instanceof Closure => $item(),
$item instanceof static => $item->getPromise(),
default => $item,
};
}

foreach ($promises as $key => $promise) {
$results[$key] = $promise->wait();
}

return $results;
}

$promises = [];

foreach ($requests as $key => $item) {
$promises[$key] = $item instanceof static ? $item->getPromise() : $item;
}
// Use a generator to create promises on-demand for proper concurrency control
$promiseGenerator = function () use ($requests) {
foreach ($requests as $key => $item) {
yield $key => match (true) {
$item instanceof Closure => $item(),
$item instanceof static => $item->getPromise(),
default => $item,
};
}
};

(new EachPromise($promises, [
(new EachPromise($promiseGenerator(), [
'fulfilled' => function ($result, $key) use (&$results) {
$results[$key] = $result;
},
Expand Down Expand Up @@ -969,34 +983,32 @@ public function send(string $method, string $url, array $options = [])

$this->dispatchResponseReceivedEvent($response);

if ($response->successful()) {
return;
}

try {
$shouldRetry = $this->retryWhenCallback ? call_user_func($this->retryWhenCallback, $response->toException(), $this, $this->request->toPsrRequest()->getMethod()) : true;
} catch (Exception $exception) {
$shouldRetry = false;
if (! $response->successful()) {
try {
$shouldRetry = $this->retryWhenCallback ? call_user_func($this->retryWhenCallback, $response->toException(), $this, $this->request->toPsrRequest()->getMethod()) : true;
} catch (Exception $exception) {
$shouldRetry = false;

throw $exception;
}
throw $exception;
}

if ($this->throwCallback &&
($this->throwIfCallback === null ||
call_user_func($this->throwIfCallback, $response))) {
$response->throw($this->throwCallback);
}
if ($this->throwCallback &&
($this->throwIfCallback === null ||
call_user_func($this->throwIfCallback, $response))) {
$response->throw($this->throwCallback);
}

$potentialTries = is_array($this->tries)
? count($this->tries) + 1
: $this->tries;
$potentialTries = is_array($this->tries)
? count($this->tries) + 1
: $this->tries;

if ($attempt < $potentialTries && $shouldRetry) {
$response->throw();
}
if ($attempt < $potentialTries && $shouldRetry) {
$response->throw();
}

if ($potentialTries > 1 && $this->retryThrow) {
$response->throw();
if ($potentialTries > 1 && $this->retryThrow) {
$response->throw();
}
}
});
} catch (TransferException $e) {
Expand Down
14 changes: 10 additions & 4 deletions src/Illuminate/Http/Client/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public function __construct(?Factory $factory = null)
* Add a request to the pool with a key.
*
* @param string $key
* @return \Illuminate\Http\Client\PendingRequest
* @return \Illuminate\Http\Client\DeferredRequest
*/
public function as(string $key)
{
return $this->pool[$key] = $this->asyncRequest();
return new DeferredRequest($this->pool, $key, $this->factory, $this->handler);
}

/**
Expand Down Expand Up @@ -77,10 +77,16 @@ public function getRequests()
*
* @param string $method
* @param array $parameters
* @return \Illuminate\Http\Client\PendingRequest|\GuzzleHttp\Promise\Promise
* @return \Illuminate\Http\Client\DeferredRequest
*/
public function __call($method, $parameters)
{
return $this->pool[] = $this->asyncRequest()->$method(...$parameters);
// Get the next numeric index and create a DeferredRequest for method chaining
$key = count($this->pool);

$deferred = new DeferredRequest($this->pool, $key, $this->factory, $this->handler);

// Call the method on the DeferredRequest to start accumulating calls
return $deferred->$method(...$parameters);
}
}
27 changes: 27 additions & 0 deletions tests/Http/HttpClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,33 @@ public function testMiddlewareRunsInPool()
$this->assertSame(['hyped-for' => 'laravel-movie'], json_decode(tap($history[0]['request']->getBody())->rewind()->getContents(), true));
}

public function testMiddlewareRunsInBatch()
{
$this->factory->fake(function (Request $request) {
return $this->factory->response('Fake');
});

$history = [];

$middleware = Middleware::history($history);

$batch = $this->factory->batch(fn (Batch $batch) => [
$batch->withMiddleware($middleware)->post('https://example.com', ['hyped-for' => 'laravel-movie']),
]);

$responses = $batch->send();

$response = $responses[0];

$this->assertSame('Fake', $response->body());

$this->assertCount(1, $history);

$this->assertSame('Fake', tap($history[0]['response']->getBody())->rewind()->getContents());

$this->assertSame(['hyped-for' => 'laravel-movie'], json_decode(tap($history[0]['request']->getBody())->rewind()->getContents(), true));
}

public function testPoolConcurrency()
{
$this->factory->fake([
Expand Down