diff --git a/README.md b/README.md index e4cbf48..dfc05f2 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,7 @@ Database Adapter implementation for the [neos/eventstore](https://github.com/neos/eventstore) package. > **Note** -> Currently this package supports MySQL (including MariaDB) and PostgreSQL -> The `DoctrineEventStore` can be used with SQLite, too. But that platform is not yet supported for the `DoctrineCheckpointStorage` +> Currently this package supports MySQL (including MariaDB), PostgreSQL and SQLite. ## Usage diff --git a/composer.json b/composer.json index c1ced46..23b563e 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,7 @@ ], "require": { "php": "^8.1", - "neos/eventstore": "1.0.0-beta1", + "neos/eventstore": "@dev", "doctrine/dbal": "^2", "webmozart/assert": "^1.10", "psr/clock": "^1" @@ -38,17 +38,15 @@ "test:phpstan": "phpstan", "test:cs": "phpcs --colors src", "test:cs:fix": "phpcbf --colors src", - "test:unit": "phpunit tests/Unit", "test:integration": "phpunit tests/Integration --exclude-group=parallel", "test:consistency": [ - "paratest tests/Integration --group=parallel --functional --processes 20", - "Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\ConcurrencyTest::validateEvents", - "Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\ConcurrencyTest::cleanup" + "Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\DoctrineEventStoreTest::prepare", + "paratest tests/Integration --group=parallel --functional --processes 10", + "Neos\\EventStore\\DoctrineAdapter\\Tests\\Integration\\DoctrineEventStoreTest::consistency_validateEvents" ], "test": [ "@test:phpstan", "@test:cs", - "@test:unit", "@test:integration", "@test:consistency" ] diff --git a/src/DoctrineCheckpointStorage.php b/src/DoctrineCheckpointStorage.php deleted file mode 100644 index 673b81e..0000000 --- a/src/DoctrineCheckpointStorage.php +++ /dev/null @@ -1,121 +0,0 @@ -connection->getDatabasePlatform(); - if (!($platform instanceof MySqlPlatform || $platform instanceof PostgreSqlPlatform)) { - throw new \InvalidArgumentException(sprintf('The %s only supports the platforms %s and %s currently. Given: %s', $this::class, MySqlPlatform::class, PostgreSqlPlatform::class, get_debug_type($platform)), 1660556004); - } - $this->platform = $platform; - } - - public function acquireLock(): SequenceNumber - { - if ($this->connection->isTransactionActive()) { - throw new CheckpointException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because a transaction is active already', $this->subscriberId), 1652268416); - } - $this->connection->beginTransaction(); - try { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ' . $this->platform->getForUpdateSQL() . ' NOWAIT', [ - 'subscriberId' => $this->subscriberId - ]); - } catch (DBALException $exception) { - $this->connection->rollBack(); - if ($exception instanceof LockWaitTimeoutException || ($exception instanceof DBALDriverException && ($exception->getErrorCode() === 3572 || $exception->getErrorCode() === 7))) { - throw new CheckpointException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because it is acquired already', $this->subscriberId), 1652279016); - } - throw new \RuntimeException($exception->getMessage(), 1544207778, $exception); - } - if (!is_numeric($highestAppliedSequenceNumber)) { - $this->connection->rollBack(); - throw new CheckpointException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279139); - } - $this->lockedSequenceNumber = SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - return $this->lockedSequenceNumber; - } - - public function updateAndReleaseLock(SequenceNumber $sequenceNumber): void - { - if ($this->lockedSequenceNumber === null) { - throw new CheckpointException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because the lock has not been acquired successfully before', $this->subscriberId), 1660556344); - } - if (!$this->connection->isTransactionActive()) { - throw new CheckpointException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because no transaction is active', $this->subscriberId), 1652279314); - } - try { - if (!$this->lockedSequenceNumber->equals($sequenceNumber)) { - $this->connection->update($this->tableName, ['appliedsequencenumber' => $sequenceNumber->value], ['subscriberid' => $this->subscriberId]); - } - $this->connection->commit(); - } catch (DBALException $exception) { - $this->connection->rollBack(); - throw new CheckpointException(sprintf('Failed to update and commit highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279375, $exception); - } finally { - $this->lockedSequenceNumber = null; - } - } - - public function getHighestAppliedSequenceNumber(): SequenceNumber - { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ', [ - 'subscriberId' => $this->subscriberId - ]); - if (!is_numeric($highestAppliedSequenceNumber)) { - throw new CheckpointException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279427); - } - return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - } - - public function setup(): SetupResult - { - $schemaManager = $this->connection->getSchemaManager(); - if (!$schemaManager instanceof AbstractSchemaManager) { - throw new \RuntimeException('Failed to retrieve Schema Manager', 1652269057); - } - $schema = new Schema(); - $table = $schema->createTable($this->tableName); - $table->addColumn('subscriberid', Types::STRING, ['length' => 255]); - $table->addColumn('appliedsequencenumber', Types::INTEGER); - $table->setPrimaryKey(['subscriberid']); - - $schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema); - foreach ($schemaDiff->toSaveSql($this->platform) as $statement) { - $this->connection->executeStatement($statement); - } - try { - $this->connection->insert($this->tableName, ['subscriberid' => $this->subscriberId, 'appliedsequencenumber' => 0]); - } catch (UniqueConstraintViolationException $e) { - // table and row already exists, ignore - } - - return SetupResult::success(''); - } -} diff --git a/src/DoctrineEventStore.php b/src/DoctrineEventStore.php index 949ad91..76450e5 100644 --- a/src/DoctrineEventStore.php +++ b/src/DoctrineEventStore.php @@ -6,6 +6,8 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Driver\Exception as DriverException; use Doctrine\DBAL\Exception as DbalException; +use Doctrine\DBAL\Exception\DeadlockException; +use Doctrine\DBAL\Exception\LockWaitTimeoutException; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Comparator; @@ -21,7 +23,6 @@ use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\Events; use Neos\EventStore\Model\EventStore\CommitResult; -use Neos\EventStore\Model\EventStore\SetupResult; use Neos\EventStore\Model\EventStore\Status; use Neos\EventStore\Model\EventStream\EventStreamFilter; use Neos\EventStore\Model\EventStream\EventStreamInterface; @@ -29,11 +30,9 @@ use Neos\EventStore\Model\EventStream\MaybeVersion; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Neos\EventStore\Model\EventStream\VirtualStreamType; -use Neos\EventStore\ProvidesSetupInterface; -use Neos\EventStore\ProvidesStatusInterface; use Psr\Clock\ClockInterface; -final class DoctrineEventStore implements EventStoreInterface, ProvidesSetupInterface, ProvidesStatusInterface +final class DoctrineEventStore implements EventStoreInterface { private readonly ClockInterface $clock; @@ -111,6 +110,9 @@ public function commit(StreamName $streamName, Events $events, ExpectedVersion $ $retryWaitInterval *= 2; $this->connection->rollBack(); continue; + } catch (DeadlockException | LockWaitTimeoutException $exception) { + $this->connection->rollBack(); + throw new ConcurrencyException($exception->getMessage(), 1705330559, $exception); } catch (DbalException | ConcurrencyException | \JsonException $exception) { $this->connection->rollBack(); throw $exception; @@ -127,27 +129,42 @@ public function deleteStream(StreamName $streamName): void public function status(): Status { - return Status::error('not implemented'); + try { + $this->connection->connect(); + } catch (DbalException $e) { + return Status::error(sprintf('Failed to connect to database: %s', $e->getMessage())); + } + $requiredSqlStatements = $this->determineRequiredSqlStatements(); + if ($requiredSqlStatements !== []) { + return Status::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); + } + return Status::ok(); } - public function setup(): SetupResult + public function setup(): void { - $schemaManager = $this->connection->getSchemaManager(); - assert($schemaManager !== null); - $fromSchema = $schemaManager->createSchema(); - $schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema()); - - $statements = $schemaDiff->toSaveSql($this->connection->getDatabasePlatform()); - if ($statements === []) { - return SetupResult::success('Table schema is up to date, no migration required'); - } - - foreach ($statements as $statement) { + foreach ($this->determineRequiredSqlStatements() as $statement) { $this->connection->executeStatement($statement); } - return SetupResult::success('Event store table created/updated successfully'); } + /** + * @return array + */ + private function determineRequiredSqlStatements(): array + { + $schemaManager = $this->connection->getSchemaManager(); + assert($schemaManager !== null); + $platform = $this->connection->getDatabasePlatform(); + assert($platform !== null); + if (!$schemaManager->tablesExist($this->eventTableName)) { + return $platform->getCreateTableSQL($this->createEventStoreSchema()->getTable($this->eventTableName)); + } + $tableSchema = $schemaManager->listTableDetails($this->eventTableName); + $fromSchema = new Schema([$tableSchema], [], $schemaManager->createSchemaConfig()); + $schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema()); + return $schemaDiff->toSaveSql($platform); + } // ---------------------------------- diff --git a/tests/Integration/ConcurrencyTest.php b/tests/Integration/ConcurrencyTest.php deleted file mode 100644 index 1785eaf..0000000 --- a/tests/Integration/ConcurrencyTest.php +++ /dev/null @@ -1,67 +0,0 @@ - - */ - private static array $eventStoresById = []; - private static ?Connection $connection = null; - - public static function cleanup(): void - { - foreach (array_keys(self::$eventStoresById) as $eventStoreId) { - self::connection()->executeStatement('DROP TABLE ' . self::eventTableName($eventStoreId)); - } - } - - protected static function createEventStore(string $id): EventStoreInterface - { - if (!isset(self::$eventStoresById[$id])) { - $connection = self::connection(); - $eventTableName = self::eventTableName($id); - if ($connection->getDatabasePlatform() instanceof PostgreSQLPlatform) { - $connection->executeStatement('TRUNCATE TABLE ' . $eventTableName . ' RESTART IDENTITY'); - } elseif ($connection->getDatabasePlatform() instanceof SqlitePlatform) { - /** @noinspection SqlWithoutWhere */ - $connection->executeStatement('DELETE FROM ' . $eventTableName); - $connection->executeStatement('DELETE FROM sqlite_sequence WHERE name =\'' . $eventTableName . '\''); - } else { - $connection->executeStatement('TRUNCATE TABLE ' . $eventTableName); - } - echo PHP_EOL . 'Prepared tables for ' . $connection->getDatabasePlatform()::class . PHP_EOL; - self::$eventStoresById[$id] = new DoctrineEventStore($connection, $eventTableName); - } - return self::$eventStoresById[$id]; - } - - private static function connection(): Connection - { - if (self::$connection === null) { - $dsn = getenv('DB_DSN'); - if (!is_string($dsn)) { - $dsn = 'sqlite:///events_test.sqlite'; - } - self::$connection = DriverManager::getConnection(['url' => $dsn]); - } - return self::$connection; - } - - private static function eventTableName(string $eventStoreId): string - { - return 'events_test_' . $eventStoreId; - } -} diff --git a/tests/Integration/DoctrineCheckpointStorageTest.php b/tests/Integration/DoctrineCheckpointStorageTest.php deleted file mode 100644 index 65cd687..0000000 --- a/tests/Integration/DoctrineCheckpointStorageTest.php +++ /dev/null @@ -1,53 +0,0 @@ - */ - private array $storages = []; - - public static function setUpBeforeClass(): void - { - $dbDns = getenv('DB_DSN'); - if ($dbDns === false) { - self::markTestSkipped('Missing DB_DSN environment variable, see https://phpunit.readthedocs.io/en/9.5/configuration.html#the-env-element'); - } - self::$dbDns = $dbDns; - } - - public static function tearDownAfterClass(): void - { - DriverManager::getConnection(['url' => self::$dbDns])->executeStatement('DROP TABLE IF EXISTS neos_eventstore_doctrineadapter_doctrinecheckpointstoragetest'); - } - - public function tearDown(): void - { - /** @var array{storage: DoctrineCheckpointStorage, connection: Connection} $storage */ - foreach ($this->storages as $storage) { - if (!$storage['connection']->isTransactionActive()) { - $storage['storage']->acquireLock(); - } - $storage['storage']->updateAndReleaseLock(SequenceNumber::none()); - } - } - - protected function createCheckpointStorage(string $subscriptionId): DoctrineCheckpointStorage - { - $connection = DriverManager::getConnection(['url' => self::$dbDns]); - $checkpointStorage = new DoctrineCheckpointStorage($connection, 'neos_eventstore_doctrineadapter_doctrinecheckpointstoragetest', $subscriptionId); - $checkpointStorage->setup(); - $this->storages[] = ['connection' => $connection, 'storage' => $checkpointStorage]; - return $checkpointStorage; - } -} diff --git a/tests/Integration/DoctrineEventStoreTest.php b/tests/Integration/DoctrineEventStoreTest.php index 54c8575..7e5b102 100644 --- a/tests/Integration/DoctrineEventStoreTest.php +++ b/tests/Integration/DoctrineEventStoreTest.php @@ -4,10 +4,12 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Exception as DbalException; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Platforms\SqlitePlatform; use Neos\EventStore\DoctrineAdapter\DoctrineEventStore; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Model\EventStore\StatusType; use Neos\EventStore\Tests\Integration\AbstractEventStoreTestBase; use PHPUnit\Framework\Attributes\CoversClass; @@ -16,12 +18,14 @@ final class DoctrineEventStoreTest extends AbstractEventStoreTestBase { private static ?Connection $connection = null; - protected function createEventStore(): EventStoreInterface + protected static function createEventStore(): EventStoreInterface { - $connection = self::connection(); - $eventStore = new DoctrineEventStore($connection, self::eventTableName()); - $eventStore->setup(); + return new DoctrineEventStore(self::connection(), self::eventTableName()); + } + protected static function resetEventStore(): void + { + $connection = self::connection(); if ($connection->getDatabasePlatform() instanceof SqlitePlatform) { $connection->executeStatement('DELETE FROM ' . self::eventTableName()); $connection->executeStatement('UPDATE SQLITE_SEQUENCE SET SEQ=0 WHERE NAME="' . self::eventTableName() . '"'); @@ -30,7 +34,6 @@ protected function createEventStore(): EventStoreInterface } else { $connection->executeStatement('TRUNCATE TABLE ' . self::eventTableName()); } - return $eventStore; } public static function connection(): Connection @@ -50,4 +53,43 @@ public static function eventTableName(): string return 'events_test'; } + public function test_setup_throws_exception_if_database_connection_fails(): void + { + $connection = DriverManager::getConnection(['url' => 'mysql://invalid-connection']); + $eventStore = new DoctrineEventStore($connection, self::eventTableName()); + + $this->expectException(DbalException::class); + $eventStore->setup(); + } + + public function test_status_returns_error_status_if_database_connection_fails(): void + { + $connection = DriverManager::getConnection(['url' => 'mysql://invalid-connection']); + $eventStore = new DoctrineEventStore($connection, self::eventTableName()); + self::assertSame($eventStore->status()->type, StatusType::ERROR); + } + + public function test_status_returns_setup_required_status_if_event_table_is_missing(): void + { + $connection = DriverManager::getConnection(['url' => 'sqlite:///:memory:']); + $eventStore = new DoctrineEventStore($connection, self::eventTableName()); + self::assertSame($eventStore->status()->type, StatusType::SETUP_REQUIRED); + } + + public function test_status_returns_setup_required_status_if_event_table_requires_update(): void + { + $connection = self::connection(); + $eventStore = new DoctrineEventStore($connection, self::eventTableName()); + $eventStore->setup(); + $connection->executeStatement('ALTER TABLE ' . self::eventTableName() . ' RENAME COLUMN metadata TO metadata_renamed'); + self::assertSame($eventStore->status()->type, StatusType::SETUP_REQUIRED); + } + + public function test_status_returns_ok_status_if_event_table_is_up_to_date(): void + { + $connection = self::connection(); + $eventStore = new DoctrineEventStore($connection, self::eventTableName()); + $eventStore->setup(); + self::assertSame($eventStore->status()->type, StatusType::OK); + } } diff --git a/tests/Unit/DoctrineCheckpointStorageTest.php b/tests/Unit/DoctrineCheckpointStorageTest.php deleted file mode 100644 index f7abda5..0000000 --- a/tests/Unit/DoctrineCheckpointStorageTest.php +++ /dev/null @@ -1,98 +0,0 @@ -mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockDatabasePlatform = $this->getMockBuilder(MySqlPlatform::class)->disableOriginalConstructor()->getMock(); - $this->mockConnection->method('getDatabasePlatform')->willReturn($mockDatabasePlatform); - - $this->checkpointStorage = new DoctrineCheckpointStorage($this->mockConnection, 'some_table', 'some_subscriber'); - } - - private function simulateLockAcquired(): void - { - $bound = \Closure::bind(static fn &(DoctrineCheckpointStorage $checkpointStorage) => $checkpointStorage->lockedSequenceNumber, null, $this->checkpointStorage); - /** @noinspection PhpPassByRefInspection */ - $ref = &$bound($this->checkpointStorage); - $ref = SequenceNumber::none(); - $this->mockConnection->method('isTransactionActive')->willReturn(true); - } - - public function test_constructor_fails_if_database_platform_cannot_be_determined(): void - { - $mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockConnection->expects(self::atLeastOnce())->method('getDatabasePlatform')->willReturn(null); - - $this->expectException(\InvalidArgumentException::class); - new DoctrineCheckpointStorage($mockConnection, 'some_table', 'some_subscriber'); - } - - public function test_constructor_fails_if_database_platform_is_not_supported(): void - { - $mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockDatabasePlatform = $this->getMockBuilder(SqlitePlatform::class)->disableOriginalConstructor()->getMock(); - $mockConnection->expects(self::atLeastOnce())->method('getDatabasePlatform')->willReturn($mockDatabasePlatform); - - $this->expectException(\InvalidArgumentException::class); - new DoctrineCheckpointStorage($mockConnection, 'some_table', 'some_subscriber'); - } - - public function test_acquireLock_startsTransaction(): void - { - $this->mockConnection->expects(self::once())->method('beginTransaction'); - $this->mockConnection->expects(self::once())->method('fetchOne')->willReturn('22'); - $this->checkpointStorage->acquireLock(); - } - - public function test_updateAndReleaseLock_updates_appliedsequencenumber_and_commits_transaction(): void - { - $this->simulateLockAcquired(); - - $this->mockConnection->expects(self::once())->method('update')->with('some_table', ['appliedsequencenumber' => 123], ['subscriberid' => 'some_subscriber']); - $this->mockConnection->expects(self::once())->method('commit'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(123)); - } - - public function test_updateAndReleaseLock_rolls_back_transaction_on_exception(): void - { - $this->simulateLockAcquired(); - - $mockException = $this->getMockBuilder(DBALException::class)->disableOriginalConstructor()->getMock(); - $this->mockConnection->expects(self::once())->method('update')->willThrowException($mockException); - - $this->expectException(CheckpointException::class); - $this->mockConnection->expects(self::once())->method('rollBack'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(123)); - } - - public function test_updateAndReleaseLock_does_not_update_sequenceNumber_if_it_has_not_been_changed(): void - { - $this->mockConnection->expects(self::once())->method('fetchOne')->willReturn('22'); - $this->checkpointStorage->acquireLock(); - - $this->mockConnection->expects(self::once())->method('isTransactionActive')->willReturn(true); - $this->mockConnection->expects(self::never())->method('update'); - $this->mockConnection->expects(self::once())->method('commit'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(22)); - } -}