From 26b92cb6474086733a863224be1badcd8a9ff4c9 Mon Sep 17 00:00:00 2001 From: Stefan Veljancic Date: Fri, 12 Dec 2025 12:30:27 +0100 Subject: [PATCH] Implemented compress join operator --- .../Dataset/src/Operator/CompressJoin.php | 100 ++++++++++++++++++ .../Component/Dataset/src/Stream.php | 21 +++- .../Component/Dataset/src/functions.php | 25 ++++- .../tests/Operator/CompressJoinTest.php | 69 ++++++++++++ .../Component/Dataset/tests/StreamTest.php | 31 +++++- 5 files changed, 243 insertions(+), 3 deletions(-) create mode 100644 src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php create mode 100644 src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php diff --git a/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php b/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php new file mode 100644 index 0000000..02ceef2 --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php @@ -0,0 +1,100 @@ + [1, 2], 2 => [1, 3], 3 => [1, 4], 4 => [2, 1], 5 => [2, 2]]), + * predicate: static fn(array $values): bool => $values[0][0] === $values[1][0], + * join: static fn(array $buffer): iterable => [ + * $buffer[0][1][0] => array_map(static fn(array $record): int => $record[1][1], $buffer) + * ], + * ); + * // $compressJoin will yield [1 => [2, 3, 4], 2 => [1, 2]] + * ``` + * + * @template TKey + * @template TValue + * @template TModifiedKey + * @template TModifiedValue + * + * @phpstan-type PredicateValues = array{TValue, TValue} + * @phpstan-type PredicateKeys = array{TKey, TKey} + * @phpstan-type Record = array{TKey, TValue} + * @phpstan-type Buffer = list + * @phpstan-type PredicateCallable = callable(PredicateValues, PredicateKeys=, Buffer=): bool + * @phpstan-type JoinCallable = callable(Buffer): iterable + * + * @extends AbstractStream + * @implements OperatorInterface + */ +final class CompressJoin extends AbstractStream implements OperatorInterface +{ + private readonly \Closure $predicate; + + private readonly \Closure $join; + + /** + * @param iterable $collection Collection to iterate over. + * @param PredicateCallable $predicate Callable predicate function to evaluate. + * @param JoinCallable $join Callable join function to produce joined records. + */ + public function __construct( + private readonly iterable $collection, + callable $predicate, + callable $join, + ) { + parent::__construct($collection); + $this->predicate = $predicate(...); + $this->join = $join(...); + } + + /** + * {@inheritdoc} + */ + protected function iterate(): \Traversable + { + /** @var Buffer $buffer */ + $buffer = []; + /** @var Record|null $previous */ + $previous = null; + + foreach ($this->collection as $key => $value) { + if (0 === \count($buffer)) { + $previous = [$key, $value]; + $buffer[] = $previous; + continue; + } + + \assert(null !== $previous); + + if (($this->predicate)([$previous[1], $value], [$previous[0], $key], $buffer)) { + $previous = [$key, $value]; + $buffer[] = $previous; + continue; + } + + yield from ($this->join)($buffer); + $previous = [$key, $value]; + $buffer = [$previous]; + } + + if (0 !== \count($buffer)) { + yield from ($this->join)($buffer); + } + } +} diff --git a/src/RunOpenCode/Component/Dataset/src/Stream.php b/src/RunOpenCode/Component/Dataset/src/Stream.php index 60f8347..aa726a2 100644 --- a/src/RunOpenCode/Component/Dataset/src/Stream.php +++ b/src/RunOpenCode/Component/Dataset/src/Stream.php @@ -10,6 +10,7 @@ use function RunOpenCode\Component\Dataset\aggregate as dataset_aggregate; use function RunOpenCode\Component\Dataset\batch as dataset_batch; use function RunOpenCode\Component\Dataset\collect as dataset_collect; +use function RunOpenCode\Component\Dataset\compress_join as dataset_compress_join; use function RunOpenCode\Component\Dataset\distinct as dataset_distinct; use function RunOpenCode\Component\Dataset\filter as dataset_filter; use function RunOpenCode\Component\Dataset\flatten as dataset_flatten; @@ -80,6 +81,24 @@ public function batch(callable $onBatch, int $size = 1000): self return dataset_batch($this, $onBatch, $size); } + /** + * Applies compress join operator on current stream. + * + * @template TModifiedKey + * @template TModifiedValue + * + * @param callable(array{TValue, TValue}, array{TKey, TKey}=, list=): bool $predicate Callable predicate function to evaluate. + * @param callable(list): iterable $join Callable join function to produce joined records. + * + * @return self + * + * @see Operator\CompressJoin + */ + public function compressJoin(callable $predicate, callable $join): self + { + return dataset_compress_join($this, $predicate, $join); + } + /** * Applies distinct operator on current stream. * @@ -346,4 +365,4 @@ protected function iterate(): \Traversable { yield from $this->collection; } -} +} \ No newline at end of file diff --git a/src/RunOpenCode/Component/Dataset/src/functions.php b/src/RunOpenCode/Component/Dataset/src/functions.php index 803b77f..8a94b2f 100644 --- a/src/RunOpenCode/Component/Dataset/src/functions.php +++ b/src/RunOpenCode/Component/Dataset/src/functions.php @@ -82,6 +82,29 @@ function batch(iterable $collection, callable $onBatch, int $size = 1000): Strea ); } +/** + * Create compress join operator. + * + * @template TKey + * @template TValue + * @template TModifiedKey + * @template TModifiedValue + * + * @param iterable $collection Collection to iterate over. + * @param callable(array{TValue, TValue}, array{TKey, TKey}=, list=): bool $predicate Callable predicate function to evaluate. + * @param callable(list): iterable $join Callable join function to produce joined records. + * + * @return Stream + * + * @see Operator\CompressJoin + */ +function compress_join(iterable $collection, callable $predicate, callable $join): Stream +{ + return new Stream( + new Operator\CompressJoin($collection, $predicate, $join) + ); +} + /** * Create distinct operator. * @@ -453,4 +476,4 @@ function reduce(iterable $collection, callable|string $reducer, mixed ...$args): } return $reducer->value; -} +} \ No newline at end of file diff --git a/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php b/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php new file mode 100644 index 0000000..3f013fd --- /dev/null +++ b/src/RunOpenCode/Component/Dataset/tests/Operator/CompressJoinTest.php @@ -0,0 +1,69 @@ + [10, 2], + 2 => [10, 3], + 3 => [10, 4], + 4 => [20, 1], + 5 => [20, 2], + 6 => [30, 5], + ], + static fn(array $values): bool => $values[0][0] === $values[1][0], + static fn(array $buffer): iterable => [ + $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), + ], + ); + + $this->assertSame([ + 10 => [2, 3, 4], + 20 => [1, 2], + 30 => [5], + ], \iterator_to_array($operator)); + } + + #[Test] + public function compress_join_with_single_element(): void + { + $operator = new CompressJoin( + [ + 1 => [10, 2], + ], + static fn(array $values): bool => $values[0][0] === $values[1][0], // @phpstan-ignore-line + static fn(array $buffer): iterable => [ + $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), + ], + ); + + $this->assertSame([ + 10 => [2], + ], \iterator_to_array($operator)); + } + + #[Test] + public function compress_join_with_empty(): void + { + $operator = new CompressJoin( + [], + static fn(array $values): bool => $values[0][0] === $values[1][0], + static fn(array $buffer): iterable => [ + $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), //@phpstan-ignore-line + ], + ); + + $this->assertSame([], \iterator_to_array($operator)); + } +} \ No newline at end of file diff --git a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php index 3190af4..8c9afb6 100644 --- a/src/RunOpenCode/Component/Dataset/tests/StreamTest.php +++ b/src/RunOpenCode/Component/Dataset/tests/StreamTest.php @@ -53,6 +53,35 @@ public function batch(): void ], $data); } + #[Test] + public function compressJoin(): void + { + $dataset = [ + 1 => [10, 2], + 2 => [10, 3], + 3 => [10, 4], + 4 => [20, 1], + 5 => [20, 2], + 6 => [30, 5], + ]; + + $data = new Stream($dataset) + ->compressJoin( + static fn(array $values): bool => $values[0][0] === $values[1][0], + static fn(array $buffer): iterable => [ + $buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), + ], + ) + ->collect(ArrayCollector::class) + ->value; + + $this->assertSame([ + 10 => [2, 3, 4], + 20 => [1, 2], + 30 => [5], + ], $data); + } + #[Test] public function distinct(): void { @@ -371,4 +400,4 @@ public function throws_exception_when_iterating_closed_stream(): void iterable_to_array($stream); \iterator_to_array($stream); } -} +} \ No newline at end of file