Skip to content

Commit 81c9bd9

Browse files
committed
metadata propagation issue
when message flow goes outside of aggregate which was called as first, its identifier is always propagated
1 parent 43b0ae2 commit 81c9bd9

File tree

11 files changed

+202
-2
lines changed

11 files changed

+202
-2
lines changed

packages/Ecotone/src/Modelling/AggregateFlow/LoadAggregate/LoadAggregateMessageProcessor.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public function process(Message $message): ?Message
3838
$messageType = TypeDescriptor::createFromVariable($message->getPayload());
3939

4040
if (! $message->getHeaders()->containsKey(AggregateMessage::AGGREGATE_ID)) {
41-
throw AggregateNotFoundException::create("Can't call Aggregate {$this->aggregateClassName}:{$this->aggregateMethod} as identifier header is missing. Please check your identifier mapping in {$messageType->toString()}. Have you forgot to add #[TargetIdentifier] in your Command or `aggregate.id` in metadata?");
41+
throw AggregateNotFoundException::create(sprintf("Can't call Aggregate {$this->aggregateClassName}:{$this->aggregateMethod} as identifier header is missing. Please check your identifier mapping in {$messageType->toString()}. Have you forgot to add #[TargetIdentifier] in your Command or `%s` in metadata?", AggregateMessage::AGGREGATE_ID));
4242
}
4343

4444
$aggregateIdentifiers = AggregateIdMetadata::createFrom(
@@ -47,7 +47,7 @@ public function process(Message $message): ?Message
4747

4848
foreach ($aggregateIdentifiers as $identifierName => $aggregateIdentifier) {
4949
if (is_null($aggregateIdentifier)) {
50-
throw AggregateNotFoundException::create("Can't call Aggregate {$this->aggregateClassName}:{$this->aggregateMethod} as value for identifier `{$identifierName}` is missing. Please check your identifier mapping in {$messageType->toString()}. Have you forgot to add #[TargetIdentifier] in your Command or `aggregate.id` in metadata?");
50+
throw AggregateNotFoundException::create(sprintf("Can't call Aggregate {$this->aggregateClassName}:{$this->aggregateMethod} as value for identifier `{$identifierName}` is missing. Please check your identifier mapping in {$messageType->toString()}. Have you forgot to add #[TargetIdentifier] in your Command or `%s` in metadata?", AggregateMessage::AGGREGATE_ID));
5151
}
5252
}
5353

packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
88
use Ecotone\Messaging\Message;
99
use Ecotone\Messaging\MessageHeaders;
10+
use Ecotone\Modelling\AggregateMessage;
1011

1112
/**
1213
* licence Apache-2.0
@@ -26,6 +27,12 @@ public function storeHeaders(MethodInvocation $methodInvocation, Message $messag
2627
$userlandHeaders = [];
2728
} else {
2829
$userlandHeaders = MessageHeaders::unsetAllFrameworkHeaders($message->getHeaders()->headers());
30+
unset(
31+
$userlandHeaders[AggregateMessage::AGGREGATE_ID],
32+
$userlandHeaders[AggregateMessage::CALLED_AGGREGATE_CLASS],
33+
$userlandHeaders[AggregateMessage::CALLED_AGGREGATE_INSTANCE],
34+
$userlandHeaders[AggregateMessage::TARGET_VERSION],
35+
);
2936
$userlandHeaders[MessageHeaders::MESSAGE_ID] = $message->getHeaders()->getMessageId();
3037
$userlandHeaders[MessageHeaders::MESSAGE_CORRELATION_ID] = $message->getHeaders()->getCorrelationId();
3138
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class AddItemToBasket
8+
{
9+
public function __construct(public string $basketId, public string $itemId) {}
10+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
use Ecotone\Messaging\Attribute\Asynchronous;
8+
use Ecotone\Modelling\Attribute\CommandHandler;
9+
use Ecotone\Modelling\Attribute\EventHandler;
10+
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
11+
use Ecotone\Modelling\Attribute\EventSourcingHandler;
12+
use Ecotone\Modelling\Attribute\Identifier;
13+
use Ecotone\Modelling\WithAggregateVersioning;
14+
15+
#[EventSourcingAggregate]
16+
final class Basket
17+
{
18+
use WithAggregateVersioning;
19+
20+
#[Identifier]
21+
private string $basketId;
22+
23+
#[CommandHandler]
24+
public function addItem(AddItemToBasket $command): array
25+
{
26+
return [new ItemWasAddedToBasket($this->basketId, $command->itemId)];
27+
}
28+
29+
#[EventHandler(endpointId: 'basket.itemWasAddedToBasket')]
30+
#[Asynchronous(channelName: 'basket')]
31+
public function whenItemWasAddedToBasket(ItemWasAddedToBasket $event): array
32+
{
33+
return [new ItemReservationCreated($event->itemId)];
34+
}
35+
36+
#[EventSourcingHandler]
37+
public function applyBasketCreated(BasketCreated $event): void
38+
{
39+
$this->basketId = $event->basketId;
40+
}
41+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class BasketCreated
8+
{
9+
public function __construct(public string $basketId)
10+
{
11+
}
12+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
use Ecotone\Messaging\Attribute\Asynchronous;
8+
use Ecotone\Modelling\Attribute\EventHandler;
9+
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
10+
use Ecotone\Modelling\Attribute\EventSourcingHandler;
11+
use Ecotone\Modelling\Attribute\Identifier;
12+
use Ecotone\Modelling\WithAggregateVersioning;
13+
14+
#[EventSourcingAggregate]
15+
final class ItemInventory
16+
{
17+
use WithAggregateVersioning;
18+
19+
#[Identifier]
20+
private string $itemId;
21+
22+
#[EventHandler(endpointId: 'item.itemReservationCreated')]
23+
#[Asynchronous(channelName: 'itemInventory')]
24+
public function whenItemReservationCreated(ItemReservationCreated $event): array
25+
{
26+
return [new ItemReserved($event->itemId)];
27+
}
28+
29+
#[EventSourcingHandler]
30+
public function applyItemInventoryCreated(ItemInventoryCreated $event): void
31+
{
32+
$this->itemId = $event->itemId;
33+
}
34+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class ItemInventoryCreated
8+
{
9+
public function __construct(public string $itemId)
10+
{
11+
}
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class ItemReservationCreated
8+
{
9+
public function __construct(public string $itemId)
10+
{
11+
}
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class ItemReserved
8+
{
9+
public function __construct(public string $itemId)
10+
{
11+
}
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\Modelling\Fixture\BasketWithReservations;
6+
7+
final class ItemWasAddedToBasket
8+
{
9+
public function __construct(public string $basketId, public string $itemId)
10+
{
11+
}
12+
}

packages/Ecotone/tests/Modelling/Unit/MetadataPropagatingTest.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
use Ecotone\Messaging\MessageHeaders;
1313
use Ecotone\Modelling\AggregateMessage;
1414
use PHPUnit\Framework\TestCase;
15+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\AddItemToBasket;
16+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\Basket;
17+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\BasketCreated;
18+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\ItemInventory;
19+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\ItemInventoryCreated;
20+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\ItemReservationCreated;
21+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\ItemReserved;
22+
use Test\Ecotone\Modelling\Fixture\BasketWithReservations\ItemWasAddedToBasket;
1523
use Test\Ecotone\Modelling\Fixture\MetadataPropagatingWithDoubleEventHandlers\OrderService;
1624
use Test\Ecotone\Modelling\Fixture\Order\PlaceOrder;
1725
use Test\Ecotone\Modelling\Fixture\OrderAggregate\Order;
@@ -160,4 +168,44 @@ classesToResolve: [OrderService::class],
160168
$ecotoneTestSupport->getMessageChannel('orders')->receive()->getHeaders()->headers()
161169
);
162170
}
171+
172+
public function test_propagating_headers_to_all_published_asynchronous_event_handlers_extended(): void
173+
{
174+
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
175+
classesToResolve: [Basket::class, ItemInventory::class],
176+
configuration: ServiceConfiguration::createWithDefaults(),
177+
enableAsynchronousProcessing: [
178+
SimpleMessageChannelBuilder::createQueueChannel('basket'),
179+
SimpleMessageChannelBuilder::createQueueChannel('itemInventory'),
180+
]
181+
);
182+
183+
$ecotoneTestSupport->withEventsFor(
184+
'basket-123',
185+
Basket::class,
186+
[
187+
new BasketCreated('basket-123'),
188+
]
189+
);
190+
191+
$ecotoneTestSupport->withEventsFor(
192+
'item-123',
193+
ItemInventory::class,
194+
[
195+
new ItemInventoryCreated('item-123'),
196+
]
197+
);
198+
199+
$ecotoneTestSupport->sendCommand(new AddItemToBasket('basket-123', 'item-123'));
200+
201+
self::assertEquals([new ItemWasAddedToBasket('basket-123', 'item-123')], $ecotoneTestSupport->getRecordedEvents());
202+
203+
$ecotoneTestSupport->run('basket', ExecutionPollingMetadata::createWithTestingSetup());
204+
205+
self::assertEquals([new ItemReservationCreated('item-123')], $ecotoneTestSupport->getRecordedEvents());
206+
207+
$ecotoneTestSupport->run('itemInventory', ExecutionPollingMetadata::createWithTestingSetup());
208+
209+
self::assertEquals([new ItemReserved('item-123')], $ecotoneTestSupport->getRecordedEvents());
210+
}
163211
}

0 commit comments

Comments
 (0)