Skip to content

Commit

Permalink
Merge branch 'main' into task/concurrency-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Jan 15, 2024
2 parents 1911e42 + 46d74af commit 8345fe4
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 370 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
Database Adapter implementation for the [neos/eventstore](https://github.com/neos/eventstore) package.

> **Note**
> Currently this package supports MySQL (including MariaDB) and PostgreSQL
> The `DoctrineEventStore` can be used with SQLite, too. But that platform is not yet supported for the `DoctrineCheckpointStorage`
> Currently this package supports MySQL (including MariaDB), PostgreSQL and SQLite.
## Usage

Expand Down
10 changes: 4 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
"require": {
"php": "^8.1",
"neos/eventstore": "1.0.0-beta1",
"neos/eventstore": "@dev",
"doctrine/dbal": "^2",
"webmozart/assert": "^1.10",
"psr/clock": "^1"
Expand All @@ -38,17 +38,15 @@
"test:phpstan": "phpstan",
"test:cs": "phpcs --colors src",
"test:cs:fix": "phpcbf --colors src",
"test:unit": "phpunit tests/Unit",
"test:integration": "phpunit tests/Integration --exclude-group=parallel",
"test:consistency": [
"paratest tests/Integration --group=parallel --functional --processes 20",
"Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\ConcurrencyTest::validateEvents",
"Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\ConcurrencyTest::cleanup"
"Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\DoctrineEventStoreTest::prepare",
"paratest tests/Integration --group=parallel --functional --processes 10",
"Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\DoctrineEventStoreTest::consistency_validateEvents"
],
"test": [
"@test:phpstan",
"@test:cs",
"@test:unit",
"@test:integration",
"@test:consistency"
]
Expand Down
121 changes: 0 additions & 121 deletions src/DoctrineCheckpointStorage.php

This file was deleted.

53 changes: 35 additions & 18 deletions src/DoctrineEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver\Exception as DriverException;
use Doctrine\DBAL\Exception as DbalException;
use Doctrine\DBAL\Exception\DeadlockException;
use Doctrine\DBAL\Exception\LockWaitTimeoutException;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\Comparator;
Expand All @@ -21,19 +23,16 @@
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStore\CommitResult;
use Neos\EventStore\Model\EventStore\SetupResult;
use Neos\EventStore\Model\EventStore\Status;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
use Neos\EventStore\Model\EventStream\ExpectedVersion;
use Neos\EventStore\Model\EventStream\MaybeVersion;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Neos\EventStore\Model\EventStream\VirtualStreamType;
use Neos\EventStore\ProvidesSetupInterface;
use Neos\EventStore\ProvidesStatusInterface;
use Psr\Clock\ClockInterface;

final class DoctrineEventStore implements EventStoreInterface, ProvidesSetupInterface, ProvidesStatusInterface
final class DoctrineEventStore implements EventStoreInterface
{
private readonly ClockInterface $clock;

Expand Down Expand Up @@ -111,6 +110,9 @@ public function commit(StreamName $streamName, Events $events, ExpectedVersion $
$retryWaitInterval *= 2;
$this->connection->rollBack();
continue;
} catch (DeadlockException | LockWaitTimeoutException $exception) {
$this->connection->rollBack();
throw new ConcurrencyException($exception->getMessage(), 1705330559, $exception);
} catch (DbalException | ConcurrencyException | \JsonException $exception) {
$this->connection->rollBack();
throw $exception;
Expand All @@ -127,27 +129,42 @@ public function deleteStream(StreamName $streamName): void

public function status(): Status
{
return Status::error('not implemented');
try {
$this->connection->connect();
} catch (DbalException $e) {
return Status::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
$requiredSqlStatements = $this->determineRequiredSqlStatements();
if ($requiredSqlStatements !== []) {
return Status::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return Status::ok();
}

public function setup(): SetupResult
public function setup(): void
{
$schemaManager = $this->connection->getSchemaManager();
assert($schemaManager !== null);
$fromSchema = $schemaManager->createSchema();
$schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema());

$statements = $schemaDiff->toSaveSql($this->connection->getDatabasePlatform());
if ($statements === []) {
return SetupResult::success('Table schema is up to date, no migration required');
}

foreach ($statements as $statement) {
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->connection->executeStatement($statement);
}
return SetupResult::success('Event store table created/updated successfully');
}

/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$schemaManager = $this->connection->getSchemaManager();
assert($schemaManager !== null);
$platform = $this->connection->getDatabasePlatform();
assert($platform !== null);
if (!$schemaManager->tablesExist($this->eventTableName)) {
return $platform->getCreateTableSQL($this->createEventStoreSchema()->getTable($this->eventTableName));
}
$tableSchema = $schemaManager->listTableDetails($this->eventTableName);
$fromSchema = new Schema([$tableSchema], [], $schemaManager->createSchemaConfig());
$schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema());
return $schemaDiff->toSaveSql($platform);
}

// ----------------------------------

Expand Down
67 changes: 0 additions & 67 deletions tests/Integration/ConcurrencyTest.php

This file was deleted.

53 changes: 0 additions & 53 deletions tests/Integration/DoctrineCheckpointStorageTest.php

This file was deleted.

Loading

0 comments on commit 8345fe4

Please sign in to comment.