Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Enable ReactProcessor to process regular iterables #52

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Table of Contents
- [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders)
- [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout)
- [Instantiators](doc/advanced_usage.md#instantiators)
- [Using React Streams (ReactPHP support)](doc/advanced_usage.md#using-react-streams-experimental)
- [Using ReactPHP](doc/advanced_usage.md#using-reactphp-experimental)
- [Recipes](doc/recipes.md)
- [Contributing](#contribute)
- [License](#license)
Expand Down
15 changes: 13 additions & 2 deletions doc/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,21 @@ $report = withRecipe(new LoggerRecipe($logger))
->process(['foo', 'bar']);
```

Using React streams (experimental)
Using ReactPHP (experimental)
----------------------------------

You can plug your ETL dataflows to any [React Stream](https://github.com/reactphp/stream).
By using the `ReactStreamProcessor` recipe, you can use ReactPHP as the processor of your data.

> [!IMPORTANT]
> `react/stream` and `react/event-loop` are required for this to work.

With this processor, you can extract data from an `iterable` or a [React Stream](https://github.com/reactphp/stream):
each item will be iterated within a [Loop tick](https://github.com/reactphp/event-loop#futuretick) instead of a blocking `while` loop.

This allows you, for example, to:
- [Periodically](https://github.com/reactphp/event-loop#addperiodictimer) perform some stuff (with `Loop::addPeriodicTimer()`)
- Handle [POSIX signals](https://github.com/reactphp/event-loop#addsignal) (with `Loop::addSignal()`)
- Use [React streams](https://github.com/reactphp/stream), like a TCP / HTTP server, a Redis / MySQL connection, or a file stream, for an event-oriented approach.

Example with a TCP server:

Expand Down
16 changes: 14 additions & 2 deletions src/Extractor/ReactStreamExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,29 @@
namespace BenTools\ETL\Extractor;

use BenTools\ETL\EtlState;
use BenTools\ETL\Iterator\IteratorStream;
use React\Stream\ReadableStreamInterface;

final readonly class ReactStreamExtractor implements ExtractorInterface
{
/**
* @param iterable<mixed>|ReadableStreamInterface|null $stream
*/
public function __construct(
public ?ReadableStreamInterface $stream = null,
public ReadableStreamInterface|iterable|null $stream = null,
) {
}

public function extract(EtlState $state): ReadableStreamInterface
{
return $state->source ?? $this->stream;
return $this->ensureStream($state->source ?? $this->stream);
}

/**
* @param iterable<mixed>|ReadableStreamInterface $items
*/
private function ensureStream(iterable|ReadableStreamInterface $items): ReadableStreamInterface
{
return $items instanceof ReadableStreamInterface ? $items : new IteratorStream($items);
}
}
56 changes: 56 additions & 0 deletions src/Iterator/ConsumableIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Iterator;

use Iterator;
use OutOfRangeException;

use function BenTools\ETL\iterable_to_iterator;

/**
* @internal
*
* @template T
*/
final class ConsumableIterator
{
private readonly Iterator $iterator;
private bool $started = false;
private bool $ended = false;

/**
* @param iterable<T> $items
*/
public function __construct(iterable $items)
{
$this->iterator = iterable_to_iterator($items);
}

public function consume(): mixed
{
if ($this->ended) {
throw new OutOfRangeException('This iterator has no more items.'); // @codeCoverageIgnore
}

if (!$this->started) {
$this->iterator->rewind();
$this->started = true;
}

$value = $this->iterator->current();
$this->iterator->next();

if (!$this->iterator->valid()) {
$this->ended = true;
}

return $value;
}

public function isComplete(): bool
{
return $this->ended;
}
}
82 changes: 82 additions & 0 deletions src/Iterator/IteratorStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Iterator;

use Evenement\EventEmitterTrait;
use React\EventLoop\Loop;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @internal
*
* @template T
*/
final class IteratorStream implements ReadableStreamInterface
{
use EventEmitterTrait;

/**
* @var ConsumableIterator<T>
*/
public readonly ConsumableIterator $iterator;
public bool $paused = false;

/**
* @param iterable<T> $items
*/
public function __construct(iterable $items)
{
$this->iterator = new ConsumableIterator($items);
$this->resume();
}

public function isReadable(): bool
{
return !$this->iterator->isComplete();
}

public function pause(): void
{
$this->paused = true;
}

public function resume(): void
{
$this->paused = false;
$this->process();
}

private function process(): void
{
if (!$this->iterator->isComplete()) {
Loop::futureTick(function () {
if (!$this->paused) {
$this->emit('data', [$this->iterator->consume()]);
}
$this->process();
});
} else {
$this->emit('end');
$this->close();
}
}

/**
* @param array<string, mixed> $options
*/
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
Util::pipe($this, $dest, $options);

return $dest;
}

public function close(): void
{
$this->emit('close');
}
}
15 changes: 15 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
use Iterator;

use function array_fill_keys;
use function array_intersect_key;
Expand All @@ -40,6 +41,20 @@ function array_fill_from(array $keys, array $values, array ...$extraValues): arr
return array_intersect_key($values, $defaults);
}

/**
* @internal
*
* @template T
*
* @param iterable<T> $items
*
* @return Iterator<T>
*/
function iterable_to_iterator(iterable $items): Iterator
{
return $items instanceof Iterator ? $items : (fn () => yield from $items)();
}

function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|callable ...$extractors): EtlExecutor
{
return (new EtlExecutor())->extractFrom(...func_get_args());
Expand Down
19 changes: 19 additions & 0 deletions tests/Behavior/ReactStreamProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@
expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']);
});

it('allows iterables, which will be converted to readable streams', function () {
$fruits = ['banana', 'apple', 'strawberry', 'raspberry', 'peach'];
$executor = useReact()
->onExtract(function (ExtractEvent $event) {
match ($event->item) {
'apple' => $event->state->skip(),
'peach' => $event->state->stop(),
default => null,
};
})
;

// When
$state = $executor->process($fruits);

// Then
expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']);
});

it('throws ExtractExceptions', function () {
// Given
$stream = new ReadableResourceStream(fopen('php://temp', 'rb'));
Expand Down
38 changes: 38 additions & 0 deletions tests/Stubs/WritableStreamStub.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Stubs;

use Evenement\EventEmitterTrait;
use React\Stream\WritableStreamInterface;

final class WritableStreamStub implements WritableStreamInterface
{
use EventEmitterTrait;

/**
* @var list<mixed>
*/
public array $data = [];

public function isWritable(): bool
{
return true;
}

public function write($data): bool
{
$this->data[] = $data;

return true;
}

public function end($data = null): void
{
}

public function close(): void
{
}
}
58 changes: 58 additions & 0 deletions tests/Unit/Iterator/IteratorStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Unit\Iterator;

use BenTools\ETL\Iterator\IteratorStream;
use BenTools\ETL\Tests\Stubs\WritableStreamStub;
use React\EventLoop\Factory;
use React\EventLoop\Loop;

use function beforeEach;
use function expect;

beforeEach(fn () => Loop::set(Factory::create()));

it('is readable during iteration', function () {
$items = ['foo', 'bar'];
$stream = new IteratorStream($items);

for ($i = 0; $i < 2; ++$i) {
expect($stream->isReadable())->toBeTrue();
$stream->iterator->consume();
}

expect($stream->isReadable())->toBeFalse();
Loop::stop();
});

it('can be paused and resumed', function () {
$stream = new IteratorStream([]);
expect($stream->paused)->toBeFalse();

// When
$stream->pause();

// Then
expect($stream->paused)->toBeTrue();

// When
$stream->resume();

// Then
expect($stream->paused)->toBeFalse();
});

it('can pipe data', function () {
$items = ['foo', 'bar', 'baz'];
$stream = new IteratorStream($items);
$dest = new WritableStreamStub();
$stream->pipe($dest);

// When
Loop::run();

// Then
expect($dest->data)->toBe($items);
});
Loading