Skip to content

Commit b428c74

Browse files
committed
pass gap detection with ProjectionRunningConfiguration instead of ProjectionSetupConfiguration
1 parent 71b01d5 commit b428c74

12 files changed

+641
-123
lines changed

packages/PdoEventSourcing/src/ProjectionRunningConfiguration.php

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
namespace Ecotone\EventSourcing;
44

55
use Assert\Assertion;
6+
use Ecotone\EventSourcing\Prooph\GapDetection;
7+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
68
use Ecotone\Messaging\Config\Container\DefinedObject;
79
use Ecotone\Messaging\Config\Container\Definition;
810
use InvalidArgumentException;
9-
use Prooph\EventStore\Metadata\MetadataMatcher;
11+
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
1012

1113
/**
1214
* licence Apache-2.0
@@ -43,14 +45,17 @@ class ProjectionRunningConfiguration implements DefinedObject
4345
public const OPTION_IS_TESTING_SETUP = 'isTestingSetup';
4446
public const DEFAULT_IS_TESTING_SETUP = false;
4547

48+
public const OPTION_GAP_DETECTION = PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION;
49+
4650
private array $options;
4751
private bool $isTestingSetup = false;
4852

4953
public function __construct(
5054
private string $projectionName,
5155
private string $runningType,
56+
array $options = [],
5257
) {
53-
$this->options = [
58+
$this->options = $options !== [] ? $options : [
5459
self::OPTION_INITIALIZE_ON_STARTUP => self::DEFAULT_INITIALIZE_ON_STARTUP,
5560
self::OPTION_AMOUNT_OF_CACHED_STREAM_NAMES => self::DEFAULT_AMOUNT_OF_CACHED_STREAM_NAMES,
5661
self::OPTION_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND => self::DEFAULT_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND,
@@ -60,16 +65,27 @@ public function __construct(
6065
self::OPTION_IS_TESTING_SETUP => self::DEFAULT_IS_TESTING_SETUP,
6166
self::OPTION_LOAD_COUNT => self::DEFAULT_LOAD_COUNT,
6267
self::OPTION_METADATA_MATCHER => self::DEFAULT_METADATA_MATCHER,
68+
self::OPTION_GAP_DETECTION => new GapDetection(retryConfig: [0, 5, 50, 500, 800]),
6369
];
6470
}
6571

6672
public function getDefinition(): Definition
6773
{
74+
$options = $this->options;
75+
foreach ($this->options as $key => $value) {
76+
$options[$key] = match ($key) {
77+
self::OPTION_METADATA_MATCHER => $value === null ? $value : $value->getDefinition(),
78+
self::OPTION_GAP_DETECTION => $value === null ? $value : $value->getDefinition(),
79+
default => $value,
80+
};
81+
}
82+
6883
return new Definition(
6984
self::class,
7085
[
7186
$this->projectionName,
7287
$this->runningType,
88+
$options,
7389
]
7490
);
7591
}
@@ -110,6 +126,10 @@ public function withOption(string $key, mixed $value): static
110126
Assertion::isInstanceOf($value, MetadataMatcher::class);
111127
}
112128

129+
if ($key === self::OPTION_GAP_DETECTION && $value !== null) {
130+
Assertion::isInstanceOf($value, GapDetection::class);
131+
}
132+
113133
$self = clone $this;
114134
$self->options[$key] = $value;
115135

packages/PdoEventSourcing/src/ProjectionSetupConfiguration.php

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
namespace Ecotone\EventSourcing;
44

5-
use Assert\Assertion;
65
use Ecotone\Messaging\Config\Container\DefinedObject;
76
use Ecotone\Messaging\Config\Container\Definition;
87
use Ecotone\Messaging\NullableMessageChannel;
98
use Ecotone\Messaging\Support\Assert;
10-
use Prooph\EventStore\Pdo\Projection\GapDetection;
11-
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
129

1310
/**
1411
* licence Apache-2.0
@@ -34,22 +31,11 @@ public function __construct(
3431

3532
public static function create(string $projectionName, ProjectionLifeCycleConfiguration $projectionLifeCycleConfiguration, string $eventStoreReferenceName, ProjectionStreamSource $projectionStreamSource, ?string $asynchronousChannelName): static
3633
{
37-
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName, projectionOptions: [PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION => new GapDetection(
38-
retryConfig: [0, 5, 50, 500, 800],
39-
detectionWindow: null
40-
)]);
34+
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName);
4135
}
4236

4337
public function getDefinition(): Definition
4438
{
45-
$projectionOptions = [];
46-
foreach ($this->projectionOptions as $key => $value) {
47-
if ($value instanceof GapDetection) {
48-
$projectionOptions[$key] = new Definition(GapDetection::class);
49-
} else {
50-
$projectionOptions[$key] = $value;
51-
}
52-
}
5339
return new Definition(
5440
ProjectionSetupConfiguration::class,
5541
[
@@ -61,7 +47,7 @@ public function getDefinition(): Definition
6147
$this->isPolling,
6248
$this->projectionEventHandlerConfigurations,
6349
$this->keepStateBetweenEvents,
64-
$projectionOptions,
50+
$this->projectionOptions,
6551
]
6652
);
6753
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph;
6+
7+
use DateInterval;
8+
use Ecotone\Messaging\Config\Container\DefinedObject;
9+
use Ecotone\Messaging\Config\Container\Definition;
10+
use Prooph\EventStore\Pdo\Projection\GapDetection as ProophGapDetection;
11+
12+
final class GapDetection implements DefinedObject
13+
{
14+
public function __construct(private ?array $retryConfig = null, private ?DateInterval $detectionWindow = null)
15+
{
16+
}
17+
18+
public function getDefinition(): Definition
19+
{
20+
return new Definition(self::class, [$this->retryConfig, $this->detectionWindow]);
21+
}
22+
23+
public function toProoph(): ProophGapDetection
24+
{
25+
return new ProophGapDetection($this->retryConfig, $this->detectionWindow);
26+
}
27+
}

packages/PdoEventSourcing/src/Prooph/LazyProophProjectionManager.php

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
88
use Ecotone\EventSourcing\ProjectionSetupConfiguration;
99
use Ecotone\EventSourcing\ProjectionStreamSource;
10+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
1011
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
1112
use Ecotone\Messaging\Handler\ReferenceSearchService;
1213
use Ecotone\Modelling\Event;
1314
use Prooph\Common\Messaging\Message;
1415
use Prooph\EventStore\Exception\ProjectionNotFound;
1516
use Prooph\EventStore\Exception\RuntimeException;
16-
use Prooph\EventStore\Metadata\MetadataMatcher;
1717
use Prooph\EventStore\Pdo\Projection\MariaDbProjectionManager;
1818
use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
1919
use Prooph\EventStore\Pdo\Projection\PostgresProjectionManager;
@@ -78,25 +78,29 @@ public function createQuery(): Query
7878

7979
public function createProjection(string $name, array $options = []): Projector
8080
{
81+
$options = $this->resolveGapDetection($options);
82+
8183
$projection = $this->getProjectionManager()->createProjection($name, $options);
8284

8385
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
8486

8587
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareProjector) {
86-
$projection = $projection->withMetadataMatcher($metadataMatcher);
88+
$projection = $projection->withMetadataMatcher($metadataMatcher->toProoph());
8789
}
8890

8991
return $projection;
9092
}
9193

9294
public function createReadModelProjection(string $name, ReadModel $readModel, array $options = []): ReadModelProjector
9395
{
96+
$options = $this->resolveGapDetection($options);
97+
9498
$projection = $this->getProjectionManager()->createReadModelProjection($name, $readModel, $options);
9599

96100
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
97101

98102
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareReadModelProjector) {
99-
$projection = $projection->withMetadataMatcher($metadataMatcher);
103+
$projection = $projection->withMetadataMatcher($metadataMatcher->toProoph());
100104
}
101105

102106
return $projection;
@@ -249,4 +253,14 @@ public static function getProjectionStreamName(string $name): string
249253
{
250254
return 'projection_' . $name;
251255
}
256+
257+
private function resolveGapDetection(array $options): array
258+
{
259+
$gapDetection = $options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] ?? null;
260+
if ($gapDetection instanceof GapDetection) {
261+
$options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] = $gapDetection->toProoph();
262+
}
263+
264+
return $options;
265+
}
252266
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph\Metadata;
6+
7+
use Ecotone\Messaging\Config\Container\DefinedObject;
8+
use Ecotone\Messaging\Config\Container\Definition;
9+
use function is_array;
10+
use function is_scalar;
11+
use function is_string;
12+
13+
use Prooph\EventStore\Exception\InvalidArgumentException;
14+
use Prooph\EventStore\Metadata\FieldType;
15+
use Prooph\EventStore\Metadata\MetadataMatcher as ProophMetadataMatcher;
16+
use Prooph\EventStore\Metadata\Operator;
17+
18+
use function sprintf;
19+
20+
final class MetadataMatcher implements DefinedObject
21+
{
22+
private array $data = [];
23+
24+
public function getDefinition(): Definition
25+
{
26+
return new Definition(self::class, [$this->data], 'create');
27+
}
28+
29+
public static function create(array $data): self
30+
{
31+
$mather = new self();
32+
$mather->data = $data;
33+
34+
return $mather;
35+
}
36+
37+
public function data(): array
38+
{
39+
return $this->data;
40+
}
41+
42+
public function withMetadataMatch(
43+
string $field,
44+
Operator $operator,
45+
$value,
46+
?FieldType $fieldType = null
47+
): self {
48+
$this->validateValue($operator, $value);
49+
50+
if (null === $fieldType) {
51+
$fieldType = FieldType::METADATA();
52+
}
53+
54+
$self = clone $this;
55+
$self->data[] = ['field' => $field, 'operator' => $operator, 'value' => $value, 'fieldType' => $fieldType];
56+
57+
return $self;
58+
}
59+
60+
/**
61+
* @param Operator $operator
62+
* @param mixed $value
63+
* @throws InvalidArgumentException
64+
*/
65+
private function validateValue(Operator $operator, $value): void
66+
{
67+
if ($operator->is(Operator::IN()) || $operator->is(Operator::NOT_IN())
68+
) {
69+
if (is_array($value)) {
70+
return;
71+
}
72+
73+
throw new InvalidArgumentException(sprintf(
74+
'Value must be an array for the operator %s.',
75+
$operator->getName()
76+
));
77+
}
78+
79+
if ($operator->is(Operator::REGEX()) && ! is_string($value)) {
80+
throw new InvalidArgumentException('Value must be a string for the regex operator.');
81+
}
82+
83+
if (! is_scalar($value)) {
84+
throw new InvalidArgumentException(sprintf(
85+
'Value must have a scalar type for the operator %s.',
86+
$operator->getName()
87+
));
88+
}
89+
}
90+
91+
public function toProoph(): ProophMetadataMatcher
92+
{
93+
$metadataMatcher = new ProophMetadataMatcher();
94+
95+
foreach ($this->data as $data) {
96+
$metadataMatcher = $metadataMatcher->withMetadataMatch(
97+
field: $data['field'],
98+
operator: $data['operator'],
99+
value: $data['value'],
100+
fieldType: $data['fieldType']
101+
);
102+
}
103+
104+
return $metadataMatcher;
105+
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Test\Ecotone\EventSourcing\Fixture\ProjectionWithMetadataMatcher;
6+
7+
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
8+
use Ecotone\Messaging\Attribute\ServiceContext;
9+
use Prooph\EventStore\Metadata\FieldType;
10+
use Prooph\EventStore\Metadata\MetadataMatcher;
11+
use Prooph\EventStore\Metadata\Operator;
12+
use Prooph\EventStore\Pdo\Projection\GapDetection;
13+
use Test\Ecotone\EventSourcing\Fixture\TicketWithSynchronousEventDrivenProjection\InProgressTicketList;
14+
15+
final class EventDrivenProjectionWithMetadataMatcherConfig
16+
{
17+
#[ServiceContext]
18+
public function enableProjection(): ProjectionRunningConfiguration
19+
{
20+
$metadataMatcher = (new MetadataMatcher())
21+
->withMetadataMatch('test', Operator::EQUALS(), 'false', FieldType::METADATA())
22+
;
23+
24+
return ProjectionRunningConfiguration::createEventDriven(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)
25+
->withTestingSetup()
26+
->withOption(ProjectionRunningConfiguration::OPTION_GAP_DETECTION, null)
27+
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher)
28+
;
29+
}
30+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
1515
use Test\Ecotone\EventSourcing\Fixture\TicketWithPollingProjection\InProgressTicketList;
1616

17-
final class ProjectionWithMetadataMatcherConfig
17+
final class PollingProjectionWithMetadataMatcherConfig
1818
{
1919
#[ServiceContext]
2020
public function enablePollingProjection(): ProjectionRunningConfiguration
@@ -25,7 +25,8 @@ public function enablePollingProjection(): ProjectionRunningConfiguration
2525

2626
return ProjectionRunningConfiguration::createPolling(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)
2727
->withTestingSetup()
28-
->withOption(PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION, null)
29-
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher);
28+
->withOption(ProjectionRunningConfiguration::OPTION_GAP_DETECTION, null)
29+
->withOption(ProjectionRunningConfiguration::OPTION_METADATA_MATCHER, $metadataMatcher)
30+
;
3031
}
3132
}

packages/PdoEventSourcing/tests/Fixture/TicketWithPollingProjection/ProjectionConfiguration.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
66
use Ecotone\Messaging\Attribute\ServiceContext;
77
use Ecotone\Messaging\Endpoint\PollingMetadata;
8+
use Prooph\EventStore\Pdo\Projection\GapDetection;
89

910
/**
1011
* licence Apache-2.0
@@ -22,7 +23,6 @@ public function setMaximumLimitedTimeForProjections()
2223
#[ServiceContext]
2324
public function configureProjection()
2425
{
25-
return ProjectionRunningConfiguration::createPolling(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)
26-
->withTestingSetup();
26+
return ProjectionRunningConfiguration::createPolling(InProgressTicketList::IN_PROGRESS_TICKET_PROJECTION)->withTestingSetup();
2727
}
2828
}

0 commit comments

Comments
 (0)