Skip to content

Commit 5184864

Browse files
committed
pass gap detection with ProjectionRunningConfiguration instead of ProjectionSetupConfiguration
1 parent 71b01d5 commit 5184864

14 files changed

+649
-130
lines changed

packages/Dbal/tests/Integration/ORMTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ private function bootstrapEcotone(array $namespaces = ['Test\Ecotone\Dbal\Fixtur
351351
protected function connectionForTenantB(): EcotoneManagerRegistryConnectionFactory
352352
{
353353
$connectionTenantB = ManagerRegistryEmulator::fromDsnAndConfig(
354-
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@localhost:3306/ecotone',
354+
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@127.0.0.1:3306/ecotone',
355355
[__DIR__ . '/../Fixture/ORM/Person']
356356
);
357357
return $connectionTenantB;
@@ -360,7 +360,7 @@ protected function connectionForTenantB(): EcotoneManagerRegistryConnectionFacto
360360
protected function connectionForTenantA(): EcotoneManagerRegistryConnectionFactory
361361
{
362362
$connectionTenantA = ManagerRegistryEmulator::fromDsnAndConfig(
363-
getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@localhost:5432/ecotone',
363+
getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@127.0.0.1:5432/ecotone',
364364
[__DIR__ . '/../Fixture/ORM/Person']
365365
);
366366
return $connectionTenantA;

packages/PdoEventSourcing/src/ProjectionRunningConfiguration.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
namespace Ecotone\EventSourcing;
44

55
use Assert\Assertion;
6+
use Ecotone\EventSourcing\Prooph\GapDetection;
7+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
68
use Ecotone\Messaging\Config\Container\DefinedObject;
79
use Ecotone\Messaging\Config\Container\Definition;
810
use InvalidArgumentException;
9-
use Prooph\EventStore\Metadata\MetadataMatcher;
11+
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
1012

1113
/**
1214
* licence Apache-2.0
@@ -43,14 +45,17 @@ class ProjectionRunningConfiguration implements DefinedObject
4345
public const OPTION_IS_TESTING_SETUP = 'isTestingSetup';
4446
public const DEFAULT_IS_TESTING_SETUP = false;
4547

48+
public const OPTION_GAP_DETECTION = PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION;
49+
4650
private array $options;
4751
private bool $isTestingSetup = false;
4852

4953
public function __construct(
5054
private string $projectionName,
5155
private string $runningType,
56+
array $options = [],
5257
) {
53-
$this->options = [
58+
$this->options = $options !== [] ? $options : [
5459
self::OPTION_INITIALIZE_ON_STARTUP => self::DEFAULT_INITIALIZE_ON_STARTUP,
5560
self::OPTION_AMOUNT_OF_CACHED_STREAM_NAMES => self::DEFAULT_AMOUNT_OF_CACHED_STREAM_NAMES,
5661
self::OPTION_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND => self::DEFAULT_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND,
@@ -60,16 +65,26 @@ public function __construct(
6065
self::OPTION_IS_TESTING_SETUP => self::DEFAULT_IS_TESTING_SETUP,
6166
self::OPTION_LOAD_COUNT => self::DEFAULT_LOAD_COUNT,
6267
self::OPTION_METADATA_MATCHER => self::DEFAULT_METADATA_MATCHER,
68+
self::OPTION_GAP_DETECTION => new GapDetection(retryConfig: [0, 5, 50, 500, 800]),
6369
];
6470
}
6571

6672
public function getDefinition(): Definition
6773
{
74+
$options = $this->options;
75+
foreach ($this->options as $key => $value) {
76+
$options[$key] = match ($key) {
77+
self::OPTION_METADATA_MATCHER, self::OPTION_GAP_DETECTION => $value === null ? $value : $value->getDefinition(),
78+
default => $value,
79+
};
80+
}
81+
6882
return new Definition(
6983
self::class,
7084
[
7185
$this->projectionName,
7286
$this->runningType,
87+
$options,
7388
]
7489
);
7590
}
@@ -110,6 +125,10 @@ public function withOption(string $key, mixed $value): static
110125
Assertion::isInstanceOf($value, MetadataMatcher::class);
111126
}
112127

128+
if ($key === self::OPTION_GAP_DETECTION && $value !== null) {
129+
Assertion::isInstanceOf($value, GapDetection::class);
130+
}
131+
113132
$self = clone $this;
114133
$self->options[$key] = $value;
115134

packages/PdoEventSourcing/src/ProjectionSetupConfiguration.php

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
namespace Ecotone\EventSourcing;
44

5-
use Assert\Assertion;
65
use Ecotone\Messaging\Config\Container\DefinedObject;
76
use Ecotone\Messaging\Config\Container\Definition;
87
use Ecotone\Messaging\NullableMessageChannel;
98
use Ecotone\Messaging\Support\Assert;
10-
use Prooph\EventStore\Pdo\Projection\GapDetection;
11-
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
129

1310
/**
1411
* licence Apache-2.0
@@ -34,22 +31,11 @@ public function __construct(
3431

3532
public static function create(string $projectionName, ProjectionLifeCycleConfiguration $projectionLifeCycleConfiguration, string $eventStoreReferenceName, ProjectionStreamSource $projectionStreamSource, ?string $asynchronousChannelName): static
3633
{
37-
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName, projectionOptions: [PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION => new GapDetection(
38-
retryConfig: [0, 5, 50, 500, 800],
39-
detectionWindow: null
40-
)]);
34+
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName);
4135
}
4236

4337
public function getDefinition(): Definition
4438
{
45-
$projectionOptions = [];
46-
foreach ($this->projectionOptions as $key => $value) {
47-
if ($value instanceof GapDetection) {
48-
$projectionOptions[$key] = new Definition(GapDetection::class);
49-
} else {
50-
$projectionOptions[$key] = $value;
51-
}
52-
}
5339
return new Definition(
5440
ProjectionSetupConfiguration::class,
5541
[
@@ -61,7 +47,7 @@ public function getDefinition(): Definition
6147
$this->isPolling,
6248
$this->projectionEventHandlerConfigurations,
6349
$this->keepStateBetweenEvents,
64-
$projectionOptions,
50+
$this->projectionOptions,
6551
]
6652
);
6753
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph;
6+
7+
use DateInterval;
8+
use Ecotone\Messaging\Config\Container\DefinedObject;
9+
use Ecotone\Messaging\Config\Container\Definition;
10+
use Prooph\EventStore\Pdo\Projection\GapDetection as ProophGapDetection;
11+
12+
/**
13+
* licence Apache-2.0
14+
*/
15+
final class GapDetection implements DefinedObject
16+
{
17+
public function __construct(private ?array $retryConfig = null, private ?DateInterval $detectionWindow = null)
18+
{
19+
}
20+
21+
public function getDefinition(): Definition
22+
{
23+
return new Definition(self::class, [$this->retryConfig, $this->detectionWindow]);
24+
}
25+
26+
public function toProoph(): ProophGapDetection
27+
{
28+
return new ProophGapDetection($this->retryConfig, $this->detectionWindow);
29+
}
30+
}

packages/PdoEventSourcing/src/Prooph/LazyProophProjectionManager.php

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
88
use Ecotone\EventSourcing\ProjectionSetupConfiguration;
99
use Ecotone\EventSourcing\ProjectionStreamSource;
10+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
1011
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
1112
use Ecotone\Messaging\Handler\ReferenceSearchService;
1213
use Ecotone\Modelling\Event;
1314
use Prooph\Common\Messaging\Message;
1415
use Prooph\EventStore\Exception\ProjectionNotFound;
1516
use Prooph\EventStore\Exception\RuntimeException;
16-
use Prooph\EventStore\Metadata\MetadataMatcher;
1717
use Prooph\EventStore\Pdo\Projection\MariaDbProjectionManager;
1818
use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
1919
use Prooph\EventStore\Pdo\Projection\PostgresProjectionManager;
@@ -78,25 +78,29 @@ public function createQuery(): Query
7878

7979
public function createProjection(string $name, array $options = []): Projector
8080
{
81+
$options = $this->resolveGapDetection($options);
82+
8183
$projection = $this->getProjectionManager()->createProjection($name, $options);
8284

8385
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
8486

8587
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareProjector) {
86-
$projection = $projection->withMetadataMatcher($metadataMatcher);
88+
$projection = $projection->withMetadataMatcher($metadataMatcher->toProoph());
8789
}
8890

8991
return $projection;
9092
}
9193

9294
public function createReadModelProjection(string $name, ReadModel $readModel, array $options = []): ReadModelProjector
9395
{
96+
$options = $this->resolveGapDetection($options);
97+
9498
$projection = $this->getProjectionManager()->createReadModelProjection($name, $readModel, $options);
9599

96100
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
97101

98102
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareReadModelProjector) {
99-
$projection = $projection->withMetadataMatcher($metadataMatcher);
103+
$projection = $projection->withMetadataMatcher($metadataMatcher->toProoph());
100104
}
101105

102106
return $projection;
@@ -249,4 +253,14 @@ public static function getProjectionStreamName(string $name): string
249253
{
250254
return 'projection_' . $name;
251255
}
256+
257+
private function resolveGapDetection(array $options): array
258+
{
259+
$gapDetection = $options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] ?? null;
260+
if ($gapDetection instanceof GapDetection) {
261+
$options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] = $gapDetection->toProoph();
262+
}
263+
264+
return $options;
265+
}
252266
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph\Metadata;
6+
7+
use Ecotone\Messaging\Config\Container\DefinedObject;
8+
use Ecotone\Messaging\Config\Container\Definition;
9+
use function is_array;
10+
use function is_scalar;
11+
use function is_string;
12+
13+
use Prooph\EventStore\Exception\InvalidArgumentException;
14+
use Prooph\EventStore\Metadata\FieldType;
15+
use Prooph\EventStore\Metadata\MetadataMatcher as ProophMetadataMatcher;
16+
use Prooph\EventStore\Metadata\Operator;
17+
18+
use function sprintf;
19+
20+
/**
21+
* licence Apache-2.0
22+
*/
23+
final class MetadataMatcher implements DefinedObject
24+
{
25+
private array $data = [];
26+
27+
public function getDefinition(): Definition
28+
{
29+
return new Definition(self::class, [$this->data], 'create');
30+
}
31+
32+
public static function create(array $data): self
33+
{
34+
$mather = new self();
35+
$mather->data = $data;
36+
37+
return $mather;
38+
}
39+
40+
public function data(): array
41+
{
42+
return $this->data;
43+
}
44+
45+
public function withMetadataMatch(
46+
string $field,
47+
Operator $operator,
48+
$value,
49+
?FieldType $fieldType = null
50+
): self {
51+
$this->validateValue($operator, $value);
52+
53+
if (null === $fieldType) {
54+
$fieldType = FieldType::METADATA();
55+
}
56+
57+
$self = clone $this;
58+
$self->data[] = ['field' => $field, 'operator' => $operator, 'value' => $value, 'fieldType' => $fieldType];
59+
60+
return $self;
61+
}
62+
63+
/**
64+
* @param Operator $operator
65+
* @param mixed $value
66+
* @throws InvalidArgumentException
67+
*/
68+
private function validateValue(Operator $operator, $value): void
69+
{
70+
if ($operator->is(Operator::IN()) || $operator->is(Operator::NOT_IN())
71+
) {
72+
if (is_array($value)) {
73+
return;
74+
}
75+
76+
throw new InvalidArgumentException(sprintf(
77+
'Value must be an array for the operator %s.',
78+
$operator->getName()
79+
));
80+
}
81+
82+
if ($operator->is(Operator::REGEX()) && ! is_string($value)) {
83+
throw new InvalidArgumentException('Value must be a string for the regex operator.');
84+
}
85+
86+
if (! is_scalar($value)) {
87+
throw new InvalidArgumentException(sprintf(
88+
'Value must have a scalar type for the operator %s.',
89+
$operator->getName()
90+
));
91+
}
92+
}
93+
94+
public function toProoph(): ProophMetadataMatcher
95+
{
96+
$metadataMatcher = new ProophMetadataMatcher();
97+
98+
foreach ($this->data as $data) {
99+
$metadataMatcher = $metadataMatcher->withMetadataMatch(
100+
field: $data['field'],
101+
operator: $data['operator'],
102+
value: $data['value'],
103+
fieldType: $data['fieldType']
104+
);
105+
}
106+
107+
return $metadataMatcher;
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\EventSourcing\Fixture\ProjectionWithMetadataMatcher;
6+
7+
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
8+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
9+
use Ecotone\Messaging\Attribute\ServiceContext;
10+
use Prooph\EventStore\Metadata\FieldType;
11+
use Prooph\EventStore\Metadata\Operator;
12+
use Test\Ecotone\EventSourcing\Fixture\TicketWithSynchronousEventDrivenProjection\InProgressTicketList;
13+
14+
final class EventDrivenProjectionWithMetadataMatcherConfig
15+
{
16+
#[ServiceContext]
17+
public function enableProjection(): ProjectionRunningConfiguration
18+
{
19+
$metadataMatcher = (new MetadataMatcher())
20+
->withMetadataMatch('test', Operator::EQUALS(), 'false', FieldType::METADATA())
21+
;
22+
23+
return ProjectionRunningConfiguration::createEventDriven(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)
24+
->withTestingSetup()
25+
->withOption(ProjectionRunningConfiguration::OPTION_GAP_DETECTION, null)
26+
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher)
27+
;
28+
}
29+
}
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@
55
namespace Test\Ecotone\EventSourcing\Fixture\ProjectionWithMetadataMatcher;
66

77
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
8+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
89
use Ecotone\Messaging\Attribute\ServiceContext;
9-
use Ecotone\Messaging\Endpoint\PollingMetadata;
1010
use Prooph\EventStore\Metadata\FieldType;
11-
use Prooph\EventStore\Metadata\MetadataMatcher;
1211
use Prooph\EventStore\Metadata\Operator;
13-
use Prooph\EventStore\Pdo\Projection\GapDetection;
14-
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
1512
use Test\Ecotone\EventSourcing\Fixture\TicketWithPollingProjection\InProgressTicketList;
1613

17-
final class ProjectionWithMetadataMatcherConfig
14+
final class PollingProjectionWithMetadataMatcherConfig
1815
{
1916
#[ServiceContext]
2017
public function enablePollingProjection(): ProjectionRunningConfiguration
@@ -25,7 +22,8 @@ public function enablePollingProjection(): ProjectionRunningConfiguration
2522

2623
return ProjectionRunningConfiguration::createPolling(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)
2724
->withTestingSetup()
28-
->withOption(PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION, null)
29-
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher);
25+
->withOption(ProjectionRunningConfiguration::OPTION_GAP_DETECTION, null)
26+
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher)
27+
;
3028
}
3129
}

0 commit comments

Comments
 (0)