Skip to content

Commit 5f7b672

Browse files
committed
pass gap detection with ProjectionRunningConfiguration instead of ProjectionSetupConfiguration
1 parent 5c5b7e0 commit 5f7b672

20 files changed

+777
-140
lines changed

.github/workflows/split-testing.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ jobs:
5151
--env KAFKA_ENABLE_KRAFT=yes
5252
--env KAFKA_CFG_NODE_ID=0
5353
--env KAFKA_CFG_PROCESS_ROLES=controller,broker
54-
--env KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
54+
--env KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
5555
--env KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093
5656
--env KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
5757
--env KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
58-
--env KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
58+
--env KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
5959
--health-cmd="kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list || exit 1"
6060
--health-interval=10s
6161
--health-timeout=5s
@@ -112,7 +112,7 @@ jobs:
112112
RABBIT_HOST: amqp://127.0.0.1:5672
113113
SQS_DSN: sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://127.0.0.1:4566&version=latest
114114
REDIS_DSN: redis://127.0.0.1:6379
115-
KAFKA_DSN: localhost:9092
115+
KAFKA_DSN: 127.0.0.1:9092
116116
steps:
117117
- uses: actions/checkout@v2
118118

.github/workflows/test-monorepo.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ jobs:
2626
--env KAFKA_ENABLE_KRAFT=yes
2727
--env KAFKA_CFG_NODE_ID=0
2828
--env KAFKA_CFG_PROCESS_ROLES=controller,broker
29-
--env KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
29+
--env KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
3030
--env KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093
3131
--env KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
3232
--env KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
33-
--env KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
33+
--env KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
3434
--health-cmd="kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list || exit 1"
3535
--health-interval=10s
3636
--health-timeout=5s
@@ -86,7 +86,7 @@ jobs:
8686
RABBIT_HOST: amqp://127.0.0.1:5672
8787
SQS_DSN: sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://127.0.0.1:4566&version=latest
8888
REDIS_DSN: redis://127.0.0.1:6379
89-
KAFKA_DSN: localhost:9092
89+
KAFKA_DSN: 127.0.0.1:9092
9090
steps:
9191
- name: PHP ${{ matrix.php-versions }} - ${{ matrix.stability }}
9292
uses: shivammathur/setup-php@v2

packages/Dbal/tests/Integration/ORMTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ private function bootstrapEcotone(array $namespaces = ['Test\Ecotone\Dbal\Fixtur
351351
protected function connectionForTenantB(): EcotoneManagerRegistryConnectionFactory
352352
{
353353
$connectionTenantB = ManagerRegistryEmulator::fromDsnAndConfig(
354-
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@localhost:3306/ecotone',
354+
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@127.0.0.1:3306/ecotone',
355355
[__DIR__ . '/../Fixture/ORM/Person']
356356
);
357357
return $connectionTenantB;
@@ -360,7 +360,7 @@ protected function connectionForTenantB(): EcotoneManagerRegistryConnectionFacto
360360
protected function connectionForTenantA(): EcotoneManagerRegistryConnectionFactory
361361
{
362362
$connectionTenantA = ManagerRegistryEmulator::fromDsnAndConfig(
363-
getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@localhost:5432/ecotone',
363+
getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@127.0.0.1:5432/ecotone',
364364
[__DIR__ . '/../Fixture/ORM/Person']
365365
);
366366
return $connectionTenantA;

packages/PdoEventSourcing/src/ProjectionRunningConfiguration.php

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
namespace Ecotone\EventSourcing;
44

55
use Assert\Assertion;
6+
use Ecotone\EventSourcing\Prooph\GapDetection;
7+
use Ecotone\EventSourcing\Prooph\Metadata\FieldType;
8+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
69
use Ecotone\Messaging\Config\Container\DefinedObject;
710
use Ecotone\Messaging\Config\Container\Definition;
811
use InvalidArgumentException;
9-
use Prooph\EventStore\Metadata\MetadataMatcher;
12+
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
1013

1114
/**
1215
* licence Apache-2.0
@@ -43,14 +46,17 @@ class ProjectionRunningConfiguration implements DefinedObject
4346
public const OPTION_IS_TESTING_SETUP = 'isTestingSetup';
4447
public const DEFAULT_IS_TESTING_SETUP = false;
4548

49+
public const OPTION_GAP_DETECTION = PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION;
50+
4651
private array $options;
4752
private bool $isTestingSetup = false;
4853

4954
public function __construct(
5055
private string $projectionName,
5156
private string $runningType,
57+
array $options = [],
5258
) {
53-
$this->options = [
59+
$this->options = ($options !== []) ? $options : [
5460
self::OPTION_INITIALIZE_ON_STARTUP => self::DEFAULT_INITIALIZE_ON_STARTUP,
5561
self::OPTION_AMOUNT_OF_CACHED_STREAM_NAMES => self::DEFAULT_AMOUNT_OF_CACHED_STREAM_NAMES,
5662
self::OPTION_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND => self::DEFAULT_WAIT_BEFORE_CALLING_ES_WHEN_NO_EVENTS_FOUND,
@@ -60,16 +66,26 @@ public function __construct(
6066
self::OPTION_IS_TESTING_SETUP => self::DEFAULT_IS_TESTING_SETUP,
6167
self::OPTION_LOAD_COUNT => self::DEFAULT_LOAD_COUNT,
6268
self::OPTION_METADATA_MATCHER => self::DEFAULT_METADATA_MATCHER,
69+
self::OPTION_GAP_DETECTION => new GapDetection(retryConfig: [0, 5, 50, 500, 800], detectionWindow: new GapDetection\DateInterval('PT10S')),
6370
];
6471
}
6572

6673
public function getDefinition(): Definition
6774
{
75+
$options = $this->options;
76+
foreach ($this->options as $key => $value) {
77+
$options[$key] = match ($key) {
78+
self::OPTION_METADATA_MATCHER, self::OPTION_GAP_DETECTION => $value?->getDefinition(),
79+
default => $value,
80+
};
81+
}
82+
6883
return new Definition(
6984
self::class,
7085
[
7186
$this->projectionName,
7287
$this->runningType,
88+
$options,
7389
]
7490
);
7591
}
@@ -110,6 +126,10 @@ public function withOption(string $key, mixed $value): static
110126
Assertion::isInstanceOf($value, MetadataMatcher::class);
111127
}
112128

129+
if ($key === self::OPTION_GAP_DETECTION && $value !== null) {
130+
Assertion::isInstanceOf($value, GapDetection::class);
131+
}
132+
113133
$self = clone $this;
114134
$self->options[$key] = $value;
115135

packages/PdoEventSourcing/src/ProjectionSetupConfiguration.php

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
namespace Ecotone\EventSourcing;
44

5-
use Assert\Assertion;
65
use Ecotone\Messaging\Config\Container\DefinedObject;
76
use Ecotone\Messaging\Config\Container\Definition;
87
use Ecotone\Messaging\NullableMessageChannel;
98
use Ecotone\Messaging\Support\Assert;
10-
use Prooph\EventStore\Pdo\Projection\GapDetection;
11-
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
129

1310
/**
1411
* licence Apache-2.0
@@ -34,22 +31,11 @@ public function __construct(
3431

3532
public static function create(string $projectionName, ProjectionLifeCycleConfiguration $projectionLifeCycleConfiguration, string $eventStoreReferenceName, ProjectionStreamSource $projectionStreamSource, ?string $asynchronousChannelName): static
3633
{
37-
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName, projectionOptions: [PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION => new GapDetection(
38-
retryConfig: [0, 5, 50, 500, 800],
39-
detectionWindow: null
40-
)]);
34+
return new self($projectionName, $projectionLifeCycleConfiguration, $eventStoreReferenceName, $projectionStreamSource, $asynchronousChannelName);
4135
}
4236

4337
public function getDefinition(): Definition
4438
{
45-
$projectionOptions = [];
46-
foreach ($this->projectionOptions as $key => $value) {
47-
if ($value instanceof GapDetection) {
48-
$projectionOptions[$key] = new Definition(GapDetection::class);
49-
} else {
50-
$projectionOptions[$key] = $value;
51-
}
52-
}
5339
return new Definition(
5440
ProjectionSetupConfiguration::class,
5541
[
@@ -61,7 +47,7 @@ public function getDefinition(): Definition
6147
$this->isPolling,
6248
$this->projectionEventHandlerConfigurations,
6349
$this->keepStateBetweenEvents,
64-
$projectionOptions,
50+
$this->projectionOptions,
6551
]
6652
);
6753
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph;
6+
7+
use Ecotone\EventSourcing\Prooph\GapDetection\DateInterval;
8+
use Ecotone\Messaging\Config\Container\DefinedObject;
9+
use Ecotone\Messaging\Config\Container\Definition;
10+
use Prooph\EventStore\Pdo\Projection\GapDetection as ProophGapDetection;
11+
12+
/**
13+
* licence Apache-2.0
14+
*/
15+
final class GapDetection implements DefinedObject
16+
{
17+
public function __construct(private ?array $retryConfig = null, private ?DateInterval $detectionWindow = null)
18+
{
19+
}
20+
21+
public function getDefinition(): Definition
22+
{
23+
return new Definition(self::class, [$this->retryConfig, $this->detectionWindow?->getDefinition() ]);
24+
}
25+
26+
public function build(): ProophGapDetection
27+
{
28+
return new ProophGapDetection($this->retryConfig, $this->detectionWindow?->build());
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph\GapDetection;
6+
7+
use Ecotone\Messaging\Config\Container\DefinedObject;
8+
use Ecotone\Messaging\Config\Container\Definition;
9+
10+
/**
11+
* licence Apache-2.0
12+
*/
13+
final class DateInterval implements DefinedObject
14+
{
15+
public function __construct(private string $duration)
16+
{
17+
}
18+
19+
public function getDefinition(): Definition
20+
{
21+
return new Definition(self::class, [$this->duration]);
22+
}
23+
24+
public function build(): \DateInterval
25+
{
26+
return new \DateInterval($this->duration);
27+
}
28+
}

packages/PdoEventSourcing/src/Prooph/LazyProophProjectionManager.php

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
use Ecotone\EventSourcing\ProjectionRunningConfiguration;
88
use Ecotone\EventSourcing\ProjectionSetupConfiguration;
99
use Ecotone\EventSourcing\ProjectionStreamSource;
10+
use Ecotone\EventSourcing\Prooph\Metadata\MetadataMatcher;
1011
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
1112
use Ecotone\Messaging\Handler\ReferenceSearchService;
1213
use Ecotone\Modelling\Event;
1314
use Prooph\Common\Messaging\Message;
1415
use Prooph\EventStore\Exception\ProjectionNotFound;
1516
use Prooph\EventStore\Exception\RuntimeException;
16-
use Prooph\EventStore\Metadata\MetadataMatcher;
1717
use Prooph\EventStore\Pdo\Projection\MariaDbProjectionManager;
1818
use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
1919
use Prooph\EventStore\Pdo\Projection\PostgresProjectionManager;
@@ -78,25 +78,29 @@ public function createQuery(): Query
7878

7979
public function createProjection(string $name, array $options = []): Projector
8080
{
81+
$options = $this->resolveGapDetection($options);
82+
8183
$projection = $this->getProjectionManager()->createProjection($name, $options);
8284

8385
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
8486

8587
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareProjector) {
86-
$projection = $projection->withMetadataMatcher($metadataMatcher);
88+
$projection = $projection->withMetadataMatcher($metadataMatcher->build());
8789
}
8890

8991
return $projection;
9092
}
9193

9294
public function createReadModelProjection(string $name, ReadModel $readModel, array $options = []): ReadModelProjector
9395
{
96+
$options = $this->resolveGapDetection($options);
97+
9498
$projection = $this->getProjectionManager()->createReadModelProjection($name, $readModel, $options);
9599

96100
$metadataMatcher = $options[ProjectionRunningConfiguration::OPTION_METADATA_MATCHER] ?? null;
97101

98102
if ($metadataMatcher instanceof MetadataMatcher && $projection instanceof MetadataAwareReadModelProjector) {
99-
$projection = $projection->withMetadataMatcher($metadataMatcher);
103+
$projection = $projection->withMetadataMatcher($metadataMatcher->build());
100104
}
101105

102106
return $projection;
@@ -249,4 +253,14 @@ public static function getProjectionStreamName(string $name): string
249253
{
250254
return 'projection_' . $name;
251255
}
256+
257+
private function resolveGapDetection(array $options): array
258+
{
259+
$gapDetection = $options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] ?? null;
260+
if ($gapDetection instanceof GapDetection) {
261+
$options[ProjectionRunningConfiguration::OPTION_GAP_DETECTION] = $gapDetection->build();
262+
}
263+
264+
return $options;
265+
}
252266
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Prooph\Metadata;
6+
7+
/**
8+
* licence Apache-2.0
9+
*/
10+
enum FieldType: int
11+
{
12+
case METADATA = 0;
13+
14+
case MESSAGE_PROPERTY = 1;
15+
}

0 commit comments

Comments
 (0)