diff --git a/README.md b/README.md index e4cbf48..dfc05f2 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,7 @@ Database Adapter implementation for the [neos/eventstore](https://github.com/neos/eventstore) package. > **Note** -> Currently this package supports MySQL (including MariaDB) and PostgreSQL -> The `DoctrineEventStore` can be used with SQLite, too. But that platform is not yet supported for the `DoctrineCheckpointStorage` +> Currently this package supports MySQL (including MariaDB), PostgreSQL and SQLite. ## Usage diff --git a/composer.json b/composer.json index 5ffba3e..57fe4ad 100644 --- a/composer.json +++ b/composer.json @@ -38,12 +38,10 @@ "test:phpstan": "phpstan", "test:cs": "phpcs --colors src", "test:cs:fix": "phpcbf --colors src", - "test:unit": "phpunit tests/Unit", "test:integration": "phpunit tests/Integration", "test": [ "@test:phpstan", "@test:cs", - "@test:unit", "@test:integration" ] } diff --git a/src/DoctrineCheckpointStorage.php b/src/DoctrineCheckpointStorage.php deleted file mode 100644 index 6efccd1..0000000 --- a/src/DoctrineCheckpointStorage.php +++ /dev/null @@ -1,117 +0,0 @@ -connection->getDatabasePlatform(); - if (!($platform instanceof MySqlPlatform || $platform instanceof PostgreSqlPlatform)) { - throw new \InvalidArgumentException(sprintf('The %s only supports the platforms %s and %s currently. Given: %s', $this::class, MySqlPlatform::class, PostgreSqlPlatform::class, get_debug_type($platform)), 1660556004); - } - $this->platform = $platform; - } - - public function acquireLock(): SequenceNumber - { - if ($this->connection->isTransactionActive()) { - throw new CheckpointException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because a transaction is active already', $this->subscriberId), 1652268416); - } - $this->connection->beginTransaction(); - try { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ' . $this->platform->getForUpdateSQL() . ' NOWAIT', [ - 'subscriberId' => $this->subscriberId - ]); - } catch (DBALException $exception) { - $this->connection->rollBack(); - if ($exception instanceof LockWaitTimeoutException || ($exception instanceof DBALDriverException && ($exception->getErrorCode() === 3572 || $exception->getErrorCode() === 7))) { - throw new CheckpointException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because it is acquired already', $this->subscriberId), 1652279016); - } - throw new \RuntimeException($exception->getMessage(), 1544207778, $exception); - } - if (!is_numeric($highestAppliedSequenceNumber)) { - $this->connection->rollBack(); - throw new CheckpointException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279139); - } - $this->lockedSequenceNumber = SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - return $this->lockedSequenceNumber; - } - - public function updateAndReleaseLock(SequenceNumber $sequenceNumber): void - { - if ($this->lockedSequenceNumber === null) { - throw new CheckpointException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because the lock has not been acquired successfully before', $this->subscriberId), 1660556344); - } - if (!$this->connection->isTransactionActive()) { - throw new CheckpointException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because no transaction is active', $this->subscriberId), 1652279314); - } - try { - if (!$this->lockedSequenceNumber->equals($sequenceNumber)) { - $this->connection->update($this->tableName, ['appliedsequencenumber' => $sequenceNumber->value], ['subscriberid' => $this->subscriberId]); - } - $this->connection->commit(); - } catch (DBALException $exception) { - $this->connection->rollBack(); - throw new CheckpointException(sprintf('Failed to update and commit highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279375, $exception); - } finally { - $this->lockedSequenceNumber = null; - } - } - - public function getHighestAppliedSequenceNumber(): SequenceNumber - { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ', [ - 'subscriberId' => $this->subscriberId - ]); - if (!is_numeric($highestAppliedSequenceNumber)) { - throw new CheckpointException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setup()', $this->subscriberId, $this::class), 1652279427); - } - return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - } - - public function setup(): void - { - $schemaManager = $this->connection->getSchemaManager(); - if (!$schemaManager instanceof AbstractSchemaManager) { - throw new \RuntimeException('Failed to retrieve Schema Manager', 1652269057); - } - $schema = new Schema(); - $table = $schema->createTable($this->tableName); - $table->addColumn('subscriberid', Types::STRING, ['length' => 255]); - $table->addColumn('appliedsequencenumber', Types::INTEGER); - $table->setPrimaryKey(['subscriberid']); - - $schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema); - foreach ($schemaDiff->toSaveSql($this->platform) as $statement) { - $this->connection->executeStatement($statement); - } - try { - $this->connection->insert($this->tableName, ['subscriberid' => $this->subscriberId, 'appliedsequencenumber' => 0]); - } catch (UniqueConstraintViolationException $e) { - // table and row already exists, ignore - } - } -} diff --git a/tests/Integration/DoctrineCheckpointStorageTest.php b/tests/Integration/DoctrineCheckpointStorageTest.php deleted file mode 100644 index 65cd687..0000000 --- a/tests/Integration/DoctrineCheckpointStorageTest.php +++ /dev/null @@ -1,53 +0,0 @@ - */ - private array $storages = []; - - public static function setUpBeforeClass(): void - { - $dbDns = getenv('DB_DSN'); - if ($dbDns === false) { - self::markTestSkipped('Missing DB_DSN environment variable, see https://phpunit.readthedocs.io/en/9.5/configuration.html#the-env-element'); - } - self::$dbDns = $dbDns; - } - - public static function tearDownAfterClass(): void - { - DriverManager::getConnection(['url' => self::$dbDns])->executeStatement('DROP TABLE IF EXISTS neos_eventstore_doctrineadapter_doctrinecheckpointstoragetest'); - } - - public function tearDown(): void - { - /** @var array{storage: DoctrineCheckpointStorage, connection: Connection} $storage */ - foreach ($this->storages as $storage) { - if (!$storage['connection']->isTransactionActive()) { - $storage['storage']->acquireLock(); - } - $storage['storage']->updateAndReleaseLock(SequenceNumber::none()); - } - } - - protected function createCheckpointStorage(string $subscriptionId): DoctrineCheckpointStorage - { - $connection = DriverManager::getConnection(['url' => self::$dbDns]); - $checkpointStorage = new DoctrineCheckpointStorage($connection, 'neos_eventstore_doctrineadapter_doctrinecheckpointstoragetest', $subscriptionId); - $checkpointStorage->setup(); - $this->storages[] = ['connection' => $connection, 'storage' => $checkpointStorage]; - return $checkpointStorage; - } -} diff --git a/tests/Integration/DoctrineEventStoreTest.php b/tests/Integration/DoctrineEventStoreTest.php index f057aa0..5e90bb8 100644 --- a/tests/Integration/DoctrineEventStoreTest.php +++ b/tests/Integration/DoctrineEventStoreTest.php @@ -80,7 +80,7 @@ public function test_status_returns_setup_required_status_if_event_table_require $connection = self::connection(); $eventStore = new DoctrineEventStore($connection, self::eventTableName()); $eventStore->setup(); - $connection->executeStatement('ALTER TABLE ' . $connection->quote(self::eventTableName()) . ' RENAME COLUMN metadata TO metadata_renamed'); + $connection->executeStatement('ALTER TABLE ' . self::eventTableName() . ' RENAME COLUMN metadata TO metadata_renamed'); self::assertSame($eventStore->status()->type, StatusType::SETUP_REQUIRED); } diff --git a/tests/Unit/DoctrineCheckpointStorageTest.php b/tests/Unit/DoctrineCheckpointStorageTest.php deleted file mode 100644 index f7abda5..0000000 --- a/tests/Unit/DoctrineCheckpointStorageTest.php +++ /dev/null @@ -1,98 +0,0 @@ -mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockDatabasePlatform = $this->getMockBuilder(MySqlPlatform::class)->disableOriginalConstructor()->getMock(); - $this->mockConnection->method('getDatabasePlatform')->willReturn($mockDatabasePlatform); - - $this->checkpointStorage = new DoctrineCheckpointStorage($this->mockConnection, 'some_table', 'some_subscriber'); - } - - private function simulateLockAcquired(): void - { - $bound = \Closure::bind(static fn &(DoctrineCheckpointStorage $checkpointStorage) => $checkpointStorage->lockedSequenceNumber, null, $this->checkpointStorage); - /** @noinspection PhpPassByRefInspection */ - $ref = &$bound($this->checkpointStorage); - $ref = SequenceNumber::none(); - $this->mockConnection->method('isTransactionActive')->willReturn(true); - } - - public function test_constructor_fails_if_database_platform_cannot_be_determined(): void - { - $mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockConnection->expects(self::atLeastOnce())->method('getDatabasePlatform')->willReturn(null); - - $this->expectException(\InvalidArgumentException::class); - new DoctrineCheckpointStorage($mockConnection, 'some_table', 'some_subscriber'); - } - - public function test_constructor_fails_if_database_platform_is_not_supported(): void - { - $mockConnection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $mockDatabasePlatform = $this->getMockBuilder(SqlitePlatform::class)->disableOriginalConstructor()->getMock(); - $mockConnection->expects(self::atLeastOnce())->method('getDatabasePlatform')->willReturn($mockDatabasePlatform); - - $this->expectException(\InvalidArgumentException::class); - new DoctrineCheckpointStorage($mockConnection, 'some_table', 'some_subscriber'); - } - - public function test_acquireLock_startsTransaction(): void - { - $this->mockConnection->expects(self::once())->method('beginTransaction'); - $this->mockConnection->expects(self::once())->method('fetchOne')->willReturn('22'); - $this->checkpointStorage->acquireLock(); - } - - public function test_updateAndReleaseLock_updates_appliedsequencenumber_and_commits_transaction(): void - { - $this->simulateLockAcquired(); - - $this->mockConnection->expects(self::once())->method('update')->with('some_table', ['appliedsequencenumber' => 123], ['subscriberid' => 'some_subscriber']); - $this->mockConnection->expects(self::once())->method('commit'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(123)); - } - - public function test_updateAndReleaseLock_rolls_back_transaction_on_exception(): void - { - $this->simulateLockAcquired(); - - $mockException = $this->getMockBuilder(DBALException::class)->disableOriginalConstructor()->getMock(); - $this->mockConnection->expects(self::once())->method('update')->willThrowException($mockException); - - $this->expectException(CheckpointException::class); - $this->mockConnection->expects(self::once())->method('rollBack'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(123)); - } - - public function test_updateAndReleaseLock_does_not_update_sequenceNumber_if_it_has_not_been_changed(): void - { - $this->mockConnection->expects(self::once())->method('fetchOne')->willReturn('22'); - $this->checkpointStorage->acquireLock(); - - $this->mockConnection->expects(self::once())->method('isTransactionActive')->willReturn(true); - $this->mockConnection->expects(self::never())->method('update'); - $this->mockConnection->expects(self::once())->method('commit'); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::fromInteger(22)); - } -}