Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/RunOpenCode/Component/Dataset/src/Operator/CompressJoin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Operator;

use RunOpenCode\Component\Dataset\AbstractStream;
use RunOpenCode\Component\Dataset\Contract\OperatorInterface;

/**
* Compress join operator.
*
* Compress join operator iterates over given collection and compresses items based on the predicate and join functions.
*
* Example usage:
*
* ```php
* use RunOpenCode\Component\Dataset\Operator\CompressJoin;
*
* $compressJoin = new CompressJoin(
* collection: new Dataset([1 => [1, 2], 2 => [1, 3], 3 => [1, 4], 4 => [2, 1], 5 => [2, 2]]),
* predicate: static fn(array $values): bool => $values[0][0] === $values[1][0],
* join: static fn(array $buffer): iterable => [
* $buffer[0][1][0] => array_map(static fn(array $record): int => $record[1][1], $buffer)
* ],
* );
* // $compressJoin will yield [1 => [2, 3, 4], 2 => [1, 2]]
* ```
*
* @template TKey
* @template TValue
* @template TModifiedKey
* @template TModifiedValue
*
* @phpstan-type PredicateValues = array{TValue, TValue}
* @phpstan-type PredicateKeys = array{TKey, TKey}
* @phpstan-type Record = array{TKey, TValue}
* @phpstan-type Buffer = list<Record>
* @phpstan-type PredicateCallable = callable(PredicateValues, PredicateKeys=, Buffer=): bool
* @phpstan-type JoinCallable = callable(Buffer): iterable<TModifiedKey, TModifiedValue>
*
* @extends AbstractStream<TModifiedKey, TModifiedValue>
* @implements OperatorInterface<TModifiedKey, TModifiedValue>
*/
final class CompressJoin extends AbstractStream implements OperatorInterface
{
private readonly \Closure $predicate;

private readonly \Closure $join;

/**
* @param iterable<TKey, TValue> $collection Collection to iterate over.
* @param PredicateCallable $predicate Callable predicate function to evaluate.
* @param JoinCallable $join Callable join function to produce joined records.
*/
public function __construct(
private readonly iterable $collection,
callable $predicate,
callable $join,
) {
parent::__construct($collection);
$this->predicate = $predicate(...);
$this->join = $join(...);
}

/**
* {@inheritdoc}
*/
protected function iterate(): \Traversable
{
/** @var Buffer $buffer */
$buffer = [];
/** @var Record|null $previous */
$previous = null;

foreach ($this->collection as $key => $value) {
if (0 === \count($buffer)) {
$previous = [$key, $value];
$buffer[] = $previous;
continue;
}

\assert(null !== $previous);

if (($this->predicate)([$previous[1], $value], [$previous[0], $key], $buffer)) {
$previous = [$key, $value];
$buffer[] = $previous;
continue;
}

yield from ($this->join)($buffer);
$previous = [$key, $value];
$buffer = [$previous];
}

if (0 !== \count($buffer)) {
yield from ($this->join)($buffer);
}
}
}
21 changes: 20 additions & 1 deletion src/RunOpenCode/Component/Dataset/src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use function RunOpenCode\Component\Dataset\aggregate as dataset_aggregate;
use function RunOpenCode\Component\Dataset\batch as dataset_batch;
use function RunOpenCode\Component\Dataset\collect as dataset_collect;
use function RunOpenCode\Component\Dataset\compress_join as dataset_compress_join;
use function RunOpenCode\Component\Dataset\distinct as dataset_distinct;
use function RunOpenCode\Component\Dataset\filter as dataset_filter;
use function RunOpenCode\Component\Dataset\flatten as dataset_flatten;
Expand Down Expand Up @@ -80,6 +81,24 @@ public function batch(callable $onBatch, int $size = 1000): self
return dataset_batch($this, $onBatch, $size);
}

/**
* Applies compress join operator on current stream.
*
* @template TModifiedKey
* @template TModifiedValue
*
* @param callable(array{TValue, TValue}, array{TKey, TKey}=, list<array{TKey, TValue}>=): bool $predicate Callable predicate function to evaluate.
* @param callable(list<array{TKey, TValue}>): iterable<TModifiedKey, TModifiedValue> $join Callable join function to produce joined records.
*
* @return self<TModifiedKey, TModifiedValue>
*
* @see Operator\CompressJoin
*/
public function compressJoin(callable $predicate, callable $join): self
{
return dataset_compress_join($this, $predicate, $join);
}

/**
* Applies distinct operator on current stream.
*
Expand Down Expand Up @@ -346,4 +365,4 @@ protected function iterate(): \Traversable
{
yield from $this->collection;
}
}
}
25 changes: 24 additions & 1 deletion src/RunOpenCode/Component/Dataset/src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ function batch(iterable $collection, callable $onBatch, int $size = 1000): Strea
);
}

/**
* Create compress join operator.
*
* @template TKey
* @template TValue
* @template TModifiedKey
* @template TModifiedValue
*
* @param iterable<TKey, TValue> $collection Collection to iterate over.
* @param callable(array{TValue, TValue}, array{TKey, TKey}=, list<array{TKey, TValue}>=): bool $predicate Callable predicate function to evaluate.
* @param callable(list<array{TKey, TValue}>): iterable<TModifiedKey, TModifiedValue> $join Callable join function to produce joined records.
*
* @return Stream<TModifiedKey, TModifiedValue>
*
* @see Operator\CompressJoin
*/
function compress_join(iterable $collection, callable $predicate, callable $join): Stream
{
return new Stream(
new Operator\CompressJoin($collection, $predicate, $join)
);
}

/**
* Create distinct operator.
*
Expand Down Expand Up @@ -453,4 +476,4 @@ function reduce(iterable $collection, callable|string $reducer, mixed ...$args):
}

return $reducer->value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

declare(strict_types=1);

namespace RunOpenCode\Component\Dataset\Tests\Operator;

use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\TestCase;
use RunOpenCode\Component\Dataset\Operator\CompressJoin;

final class CompressJoinTest extends TestCase
{
#[Test]
public function compress_join(): void
{
$operator = new CompressJoin(
[
1 => [10, 2],
2 => [10, 3],
3 => [10, 4],
4 => [20, 1],
5 => [20, 2],
6 => [30, 5],
],
static fn(array $values): bool => $values[0][0] === $values[1][0],
static fn(array $buffer): iterable => [
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
],
);

$this->assertSame([
10 => [2, 3, 4],
20 => [1, 2],
30 => [5],
], \iterator_to_array($operator));
}

#[Test]
public function compress_join_with_single_element(): void
{
$operator = new CompressJoin(
[
1 => [10, 2],
],
static fn(array $values): bool => $values[0][0] === $values[1][0], // @phpstan-ignore-line
static fn(array $buffer): iterable => [
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
],
);

$this->assertSame([
10 => [2],
], \iterator_to_array($operator));
}

#[Test]
public function compress_join_with_empty(): void
{
$operator = new CompressJoin(
[],
static fn(array $values): bool => $values[0][0] === $values[1][0],
static fn(array $buffer): iterable => [
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), //@phpstan-ignore-line
],
);

$this->assertSame([], \iterator_to_array($operator));
}
}
31 changes: 30 additions & 1 deletion src/RunOpenCode/Component/Dataset/tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,35 @@ public function batch(): void
], $data);
}

#[Test]
public function compressJoin(): void
{
$dataset = [
1 => [10, 2],
2 => [10, 3],
3 => [10, 4],
4 => [20, 1],
5 => [20, 2],
6 => [30, 5],
];

$data = new Stream($dataset)
->compressJoin(
static fn(array $values): bool => $values[0][0] === $values[1][0],
static fn(array $buffer): iterable => [
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
],
)
->collect(ArrayCollector::class)
->value;

$this->assertSame([
10 => [2, 3, 4],
20 => [1, 2],
30 => [5],
], $data);
}

#[Test]
public function distinct(): void
{
Expand Down Expand Up @@ -371,4 +400,4 @@ public function throws_exception_when_iterating_closed_stream(): void
iterable_to_array($stream);
\iterator_to_array($stream);
}
}
}