diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 8c510117f..d3bc6db38 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -96,7 +96,7 @@ akka.persistence.cassandra { tables-autocreate = false # Name of the keyspace to be created/used by the journal - keyspace = "akka" + keyspace = "ignasi20210419002" ## FIXME # Name of the table to be created/used by the journal. # If the table doesn't exist it is automatically created when tables-autocreate @@ -140,7 +140,7 @@ akka.persistence.cassandra { # Maximum number of messages that will be batched when using `persistAsync`. # Also used as the max batch size for deletes. - max-message-batch-size = 100 + max-message-batch-size = 30 # Target number of entries per partition (= columns per row). # Must not be changed after table creation (currently not checked). @@ -247,7 +247,7 @@ akka.persistence.cassandra { # Max size of these batches. The best value for this will depend on the size of # the serialized events. Cassandra logs a warning for batches above a certain # size and this should be reduced if that warning is seen. - max-message-batch-size = 150 + max-message-batch-size = 30 # Max time to buffer events for before writing. # Larger values will increase cassandra write efficiency but increase the delay before @@ -476,7 +476,7 @@ akka.persistence.cassandra { tables-autocreate = false # Name of the keyspace to be created/used by the snapshot store - keyspace = "akka_snapshot" + keyspace = "ignasi20210419002" ## FIXME # Name of the table to be created/used by the snapshot store. # If the table doesn't exist it is automatically created when tables-autocreate @@ -563,7 +563,7 @@ akka.persistence.cassandra { # CQL query that verifies the health of the database. # A query returning some result (even empty) is treated as a successful health check. # Failed query execution (connection error or client exception) is treated as a failed health check. - health-check-cql = "SELECT now() FROM system.local" + health-check-cql = "SELECT * FROM system.local" } # Default dispatcher for plugin actor and tasks. @@ -591,14 +591,14 @@ datastax-java-driver { profiles { akka-persistence-cassandra-profile { basic.request { - consistency = QUORUM + consistency = LOCAL_QUORUM # the journal does not use any counters or collections default-idempotence = true } } akka-persistence-cassandra-snapshot-profile { basic.request { - consistency = ONE + consistency = LOCAL_QUORUM # the snapshot store does not use any counters or collections default-idempotence = true } diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala index aeb26187a..557819775 100644 --- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala +++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala @@ -295,7 +295,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu } /** - * Delete all events from `tag_views` table related to to one single `persistenceId`. + * Delete all events from `tag_views` table related to one single `persistenceId`. * Events in `messages` (journal) table are not deleted and snapshots are not deleted. */ def deleteAllTaggedEvents(persistenceId: String): Future[Done] = { diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala index dc693c9e2..41ad79260 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala @@ -348,7 +348,7 @@ import akka.persistence.cassandra.FutureDone val done = if (journalSettings.tablesAutoCreate) { // reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors - session.setSchemaMetadataEnabled(false) +// session.setSchemaMetadataEnabled(false) val result = for { _ <- keyspace _ <- session.executeAsync(createTable).toScala @@ -361,13 +361,13 @@ import akka.persistence.cassandra.FutureDone } _ <- tagStatements } yield { - session.setSchemaMetadataEnabled(null) +// session.setSchemaMetadataEnabled(null) Done } result.recoverWith { case e => log.warning("Failed to create journal keyspace and tables: {}", e) - session.setSchemaMetadataEnabled(null) +// session.setSchemaMetadataEnabled(null) FutureDone } } else { diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/JournalSettings.scala b/core/src/main/scala/akka/persistence/cassandra/journal/JournalSettings.scala index 95c2f2490..77d6c4c02 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/JournalSettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/JournalSettings.scala @@ -26,7 +26,7 @@ import com.typesafe.config.Config val keyspaceAutoCreate: Boolean = journalConfig.getBoolean("keyspace-autocreate") val tablesAutoCreate: Boolean = journalConfig.getBoolean("tables-autocreate") - val keyspace: String = journalConfig.getString("keyspace") + val keyspace: String = "ignasi20210419002" // FIXME journalConfig.getString("keyspace") val table: String = journalConfig.getString("table") val metadataTable: String = journalConfig.getString("metadata-table") diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala index 3aadcdefe..83a77c58e 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala @@ -117,18 +117,18 @@ import akka.persistence.cassandra.FutureDone if (snapshotSettings.tablesAutoCreate) { // reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors - session.setSchemaMetadataEnabled(false) +// session.setSchemaMetadataEnabled(false) val result = for { _ <- keyspace _ <- session.executeAsync(createTable).toScala } yield { - session.setSchemaMetadataEnabled(null) +// session.setSchemaMetadataEnabled(null) Done } result.recoverWith { case e => log.warning("Failed to create snapshot keyspace and tables: {}", e) - session.setSchemaMetadataEnabled(null) +// session.setSchemaMetadataEnabled(null) FutureDone } } else { diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/SnapshotSettings.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/SnapshotSettings.scala index fe40a32e6..b373278ce 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/SnapshotSettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/SnapshotSettings.scala @@ -21,7 +21,7 @@ import com.typesafe.config.Config val keyspaceAutoCreate: Boolean = snapshotConfig.getBoolean("keyspace-autocreate") val tablesAutoCreate: Boolean = snapshotConfig.getBoolean("tables-autocreate") - val keyspace: String = snapshotConfig.getString("keyspace") + val keyspace: String = "ignasi20210419002" // FIXME snapshotConfig.getString("keyspace") val table: String = snapshotConfig.getString("table") diff --git a/core/src/test/resources/logback-test.xml b/core/src/test/resources/logback-test.xml index 98f404e1d..8f275375a 100644 --- a/core/src/test/resources/logback-test.xml +++ b/core/src/test/resources/logback-test.xml @@ -8,8 +8,8 @@ - - + + diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraLifecycle.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraLifecycle.scala index a8b3c0432..f26828437 100644 --- a/core/src/test/scala/akka/persistence/cassandra/CassandraLifecycle.scala +++ b/core/src/test/scala/akka/persistence/cassandra/CassandraLifecycle.scala @@ -8,20 +8,25 @@ import java.time.LocalDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter import java.util.concurrent.TimeUnit - import scala.concurrent.Await - import akka.actor.{ ActorSystem, PoisonPill, Props } import akka.persistence.PersistentActor +import akka.persistence.cassandra.CassandraLifecycle.journalTables +import akka.persistence.cassandra.CassandraLifecycle.snapshotTables import akka.testkit.{ TestKitBase, TestProbe } import com.datastax.oss.driver.api.core.CqlSession import com.typesafe.config.ConfigFactory import org.scalatest._ + import scala.concurrent.duration._ +import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } - import akka.stream.alpakka.cassandra.scaladsl.CassandraSession import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry +import com.datastax.oss.driver.api.core.CqlIdentifier +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata + +import java.util object CassandraLifecycle { @@ -37,6 +42,8 @@ object CassandraLifecycle { akka.persistence.journal.plugin = "akka.persistence.cassandra.journal" akka.persistence.snapshot-store.plugin = "akka.persistence.cassandra.snapshot" akka.persistence.cassandra.journal.circuit-breaker.call-timeout = 30s + akka.persistence.cassandra.journal.max-failures = 20 + akka.persistence.cassandra.snapshot.max-failures = 20 akka.persistence.cassandra.events-by-tag.first-time-bucket = "$firstTimeBucket" akka.test.single-expect-default = 20s akka.test.filter-leeway = 20s @@ -45,13 +52,89 @@ object CassandraLifecycle { akka.actor.allow-java-serialization = on akka.actor.warn-about-java-serializer-usage = off akka.use-slf4j = off + + datastax-java-driver { + basic.contact-points = [ "cassandra.eu-central-1.amazonaws.com:9142"] + basic.request.consistency = LOCAL_QUORUM + basic.load-balancing-policy { + class = DefaultLoadBalancingPolicy + local-datacenter = eu-central-1 + } + advanced { + auth-provider = { + class = software.aws.mcs.auth.SigV4AuthProvider + aws-region = eu-central-1 + } + ssl-engine-factory { + class = DefaultSslEngineFactory + } + } + } + """).withFallback(CassandraSpec.enableAutocreate).resolve() def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = { + // Order matters!! + // awaitPersistencePluginInit will do a best effort attempt to wait for the plugin to be + // ready and trigger the creation of tables. + awaitPersistencePluginInit(system, journalPluginId, snapshotPluginId) + // awaitTablesCreated is a hard wait ensuring tables were indeed created + awaitTablesCreated(system, journalPluginId, snapshotPluginId) + } + + val journalTables = + Set("all_persistence_ids", "messages", "metadata", "tag_scanning", "tag_views", "tag_write_progress") + val snapshotTables = Set("snapshots") + + def awaitTablesCreated(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = { + val journalName = + if (journalPluginId == "") + system.settings.config.getString("akka.persistence.cassandra.journal.keyspace") + else journalPluginId + val snapshotName = + if (snapshotPluginId == "") + system.settings.config.getString("akka.persistence.cassandra.snapshot.keyspace") + else snapshotPluginId + lazy val cluster: CqlSession = + Await.result(session.underlying(), 10.seconds) + + def session: CassandraSession = { + CassandraSessionRegistry(system).sessionFor("akka.persistence.cassandra") + } + awaitTableCount(cluster, 45, journalName, actual => journalTables.subsetOf(actual.toSet)) + awaitTableCount(cluster, 45, snapshotName, actual => snapshotTables.subsetOf(actual.toSet)) + } + + def awaitTableCount( + cluster: CqlSession, + retries: Int, + keyspaceName: String, + readinessCheck: Array[String] => Boolean): Unit = { + val tables: util.Map[CqlIdentifier, TableMetadata] = + cluster.getMetadata.getKeyspace(keyspaceName).get().getTables + + val tableNames = tables.keySet().toArray.map(_.toString) + + if (retries == 0) { + println(s"Awaiting table creation/drop timed out. Current list of tables: [${tableNames.mkString(", ")}]") + throw new RuntimeException("Awaiting table creation/drop timed out.") + } else if (readinessCheck(tableNames)) { + () + } else { + println(s"Awaiting table creation/deletion. Existing tables: [${tableNames.mkString(", ")}].") + Thread.sleep(1000) + awaitTableCount(cluster, retries - 1, keyspaceName, readinessCheck) + } + } + + private def awaitPersistencePluginInit( + system: ActorSystem, + journalPluginId: String = "", + snapshotPluginId: String = ""): Unit = { val probe = TestProbe()(system) val t0 = System.nanoTime() var n = 0 - probe.within(45.seconds) { + probe.within(75.seconds) { probe.awaitAssert( { n += 1 @@ -61,7 +144,7 @@ object CassandraLifecycle { "persistenceInit" + n) a.tell("hello", probe.ref) try { - probe.expectMsg(5.seconds, "hello") + probe.expectMsg(10.seconds, "hello") } catch { case t: Throwable => probe.watch(a) @@ -112,27 +195,62 @@ trait CassandraLifecycle extends BeforeAndAfterAll with TestKitBase { } override protected def beforeAll(): Unit = { - awaitPersistenceInit() + try { + awaitPersistenceInit() + } catch { + case NonFatal(e) => +// Try(externalCassandraCleanup()) + shutdown(system, verifySystemShutdown = false) + throw e + } super.beforeAll() } + private var initOk = false + def awaitPersistenceInit(): Unit = { CassandraLifecycle.awaitPersistenceInit(system) + initOk = true } override protected def afterAll(): Unit = { - externalCassandraCleanup() + if (initOk) + externalCassandraCleanup() shutdown(system, verifySystemShutdown = true) super.afterAll() } def dropKeyspaces(): Unit = { - val journalKeyspace = system.settings.config.getString("akka.persistence.cassandra.journal.keyspace") - val snapshotKeyspace = system.settings.config.getString("akka.persistence.cassandra.snapshot.keyspace") + val journalKeyspace = "ignasi20210419002" // FIXME system.settings.config.getString("akka.persistence.cassandra.journal.keyspace") + val snapshotKeyspace = "ignasi20210419002" // FIXME system.settings.config.getString("akka.persistence.cassandra.snapshot.keyspace") val dropped = Try { - cluster.execute(s"drop keyspace if exists ${journalKeyspace}") - cluster.execute(s"drop keyspace if exists ${snapshotKeyspace}") + + println(" ------------------- DROPPING TABLES....") + + cluster.execute(s"drop table if exists ${journalKeyspace}.all_persistence_ids") + cluster.execute(s"drop table if exists ${journalKeyspace}.messages") + cluster.execute(s"drop table if exists ${journalKeyspace}.metadata") + cluster.execute(s"drop table if exists ${journalKeyspace}.tag_scanning") + cluster.execute(s"drop table if exists ${journalKeyspace}.tag_views") + cluster.execute(s"drop table if exists ${journalKeyspace}.tag_write_progress") + + cluster.execute(s"drop table if exists ${snapshotKeyspace}.snapshots") + + CassandraLifecycle.awaitTableCount( + cluster, + 45, + journalKeyspace, + actual => actual.toSet.intersect(journalTables).isEmpty) + CassandraLifecycle.awaitTableCount( + cluster, + 45, + snapshotKeyspace, + actual => actual.toSet.intersect(snapshotTables).isEmpty) + + //cluster.execute(s"drop keyspace if exists ${journalKeyspace}") + //cluster.execute(s"drop keyspace if exists ${snapshotKeyspace}") } + dropped match { case Failure(t) => system.log.error(t, "Failed to drop keyspaces {} {}", journalKeyspace, snapshotKeyspace) case Success(_) => diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala index 2d73734b9..ab8913855 100644 --- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala @@ -6,7 +6,6 @@ package akka.persistence.cassandra import java.io.{ OutputStream, PrintStream } import java.util.concurrent.atomic.AtomicInteger - import akka.actor.ActorSystem import akka.event.Logging import akka.event.Logging.{ LogEvent, StdOutLogger } @@ -23,15 +22,17 @@ import org.scalatest.time.{ Milliseconds, Seconds, Span } import org.scalatest.{ Outcome, Suite } import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.matchers.should.Matchers + import scala.collection.immutable import scala.concurrent.duration._ - import akka.persistence.cassandra.journal.CassandraJournal import akka.serialization.SerializationExtension -import scala.util.control.NonFatal +import scala.util.control.NonFatal import akka.persistence.cassandra.TestTaggingActor.Ack import akka.actor.PoisonPill +import akka.persistence.cassandra.CassandraLifecycle.journalTables +import akka.persistence.cassandra.CassandraLifecycle.snapshotTables object CassandraSpec { def getCallerName(clazz: Class[_]): String = { @@ -63,19 +64,19 @@ object CassandraSpec { eventual-consistency-delay = 200ms } snapshot { - keyspace-autocreate = true + keyspace-autocreate = false tables-autocreate = true } journal { - keyspace-autocreate = true + keyspace-autocreate = false tables-autocreate = true } } """) val fallbackConfig = ConfigFactory.parseString(s""" - akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"] - akka.loglevel = DEBUG + #akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"] + akka.loglevel = INFO akka.use-slf4j = off datastax-java-driver { @@ -91,9 +92,11 @@ object CassandraSpec { * Picks a free port for Cassandra before starting the ActorSystem */ abstract class CassandraSpec( + // FIXME: is this default config correct? the construction of `val finalConfig` below already + // takes CassandraLifecycle.config into consideration. I think the default should be an empty Config config: Config = CassandraLifecycle.config, - val journalName: String = getCallerName(getClass), - val snapshotName: String = getCallerName(getClass), + val journalName: String = "ignasi20210419002", // FIXME getCallerName(getClass), + val snapshotName: String = "ignasi20210419002", // FIXME getCallerName(getClass), dumpRowsOnFailure: Boolean = true) extends TestKitBase with Suite @@ -105,6 +108,8 @@ abstract class CassandraSpec( def this(config: String) = this(ConfigFactory.parseString(config)) + // FIXME: is this correct? the construction of `val finalConfig` below already + // takes CassandraLifecycle.config into consideration. I think the default should be an empty Config def this() = this(CassandraLifecycle.config) private var failed = false @@ -167,45 +172,72 @@ abstract class CassandraSpec( } } + private def logDatabaseContents(): Unit = { + println("RowDump::") + import scala.collection.JavaConverters._ + if (system.settings.config.getBoolean("akka.persistence.cassandra.events-by-tag.enabled")) { + println("tag_views") + cluster + .execute(s"select * from ${journalName}.tag_views") + .asScala + .foreach(row => { + println(s"""Row:${row.getString("tag_name")},${row.getLong("timebucket")},${formatOffset( + row.getUuid("timestamp"))},${row.getString("persistence_id")},${row.getLong("tag_pid_sequence_nr")},${row + .getLong("sequence_nr")}""") + + }) + } + println("messages") + cluster + .execute(s"select * from ${journalName}.messages") + .asScala + .foreach(row => { + println( + s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong("sequence_nr")}""") + }) + + println("snapshots") + cluster + .execute(s"select * from ${snapshotName}.snapshots") + .asScala + .foreach(row => { + println( + s"""Row:${row.getString("persistence_id")}, ${row.getLong("sequence_nr")}, ${row.getLong("timestamp")}""") + }) + } + override protected def externalCassandraCleanup(): Unit = { try { - if (failed && dumpRowsOnFailure) { - println("RowDump::") - import scala.collection.JavaConverters._ - if (system.settings.config.getBoolean("akka.persistence.cassandra.events-by-tag.enabled")) { - println("tag_views") - cluster - .execute(s"select * from ${journalName}.tag_views") - .asScala - .foreach(row => { - println(s"""Row:${row.getString("tag_name")},${row.getLong("timebucket")},${formatOffset( - row.getUuid("timestamp"))},${row.getString("persistence_id")},${row - .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""") - - }) - } - println("messages") - cluster - .execute(s"select * from ${journalName}.messages") - .asScala - .foreach(row => { - println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong( - "sequence_nr")}""") - }) - - println("snapshots") - cluster - .execute(s"select * from ${snapshotName}.snapshots") - .asScala - .foreach(row => { - println( - s"""Row:${row.getString("persistence_id")}, ${row.getLong("sequence_nr")}, ${row.getLong("timestamp")}""") - }) + if (failed && dumpRowsOnFailure && false) // FIXME if (failed && dumpRowsOnFailure) + logDatabaseContents() + + // FIXME +// keyspaces().foreach { keyspace => +// cluster.execute(s"drop keyspace if exists $keyspace") +// } + + println(" ------------------- DROPPING TABLES....") + + cluster.execute(s"drop table if exists ${journalName}.all_persistence_ids") + cluster.execute(s"drop table if exists ${journalName}.messages") + cluster.execute(s"drop table if exists ${journalName}.metadata") + cluster.execute(s"drop table if exists ${journalName}.tag_scanning") + cluster.execute(s"drop table if exists ${journalName}.tag_views") + cluster.execute(s"drop table if exists ${journalName}.tag_write_progress") + + cluster.execute(s"drop table if exists ${snapshotName}.snapshots") + + CassandraLifecycle.awaitTableCount( + cluster, + 45, + journalName, + actual => actual.toSet.intersect(journalTables).isEmpty) + CassandraLifecycle.awaitTableCount( + cluster, + 45, + snapshotName, + actual => actual.toSet.intersect(snapshotTables).isEmpty) - } - keyspaces().foreach { keyspace => - cluster.execute(s"drop keyspace if exists $keyspace") - } } catch { case NonFatal(t) => println("Exception during cleanup") diff --git a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala index 17f4adb09..f2a23607d 100644 --- a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala @@ -52,8 +52,8 @@ object EventsByTagMigrationSpec { } akka.persistence.cassandra { journal { - keyspace-autocreate = true - tables-autocreate = true + keyspace-autocreate = false + tables-autocreate = false } query { events-by-persistence-id-gap-timeout = 1s diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala index 280bee557..6e08209d8 100644 --- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala @@ -6,7 +6,6 @@ package akka.persistence.cassandra.cleanup import java.time.LocalDateTime import java.time.ZoneOffset - import akka.actor.ActorRef import akka.actor.Props import akka.persistence.{ PersistentActor, SaveSnapshotSuccess, SnapshotMetadata, SnapshotOffer } @@ -16,11 +15,14 @@ import akka.persistence.journal.Tagged import akka.persistence.query.NoOffset import akka.stream.scaladsl.Sink import com.typesafe.config.ConfigFactory +import org.scalatest.time.Milliseconds +import org.scalatest.time.Seconds +import org.scalatest.time.Span object CleanupSpec { val today = LocalDateTime.now(ZoneOffset.UTC) val config = ConfigFactory.parseString(s""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.persistence.cassandra.cleanup { log-progress-every = 2 dry-run = false @@ -81,6 +83,8 @@ object CleanupSpec { class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { import CleanupSpec._ + override implicit val patience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(100, Milliseconds)) + "Cassandra cleanup" must { "delete events for one persistenceId" in { val pid = nextPid @@ -158,7 +162,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { queries.currentEventsByTag(tag = "tag-a", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(10) val cleanup = new Cleanup(system) - cleanup.deleteAllTaggedEvents(pid).futureValue + cleanup.deleteAllTaggedEvents(pid).futureValue // FIXME queries.currentEventsByTag(tag = "tag-a", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(0) } @@ -189,7 +193,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { system.stop(p) val cleanup = new Cleanup(system) - cleanup.deleteAll(pid, neverUsePersistenceIdAgain = true).futureValue + cleanup.deleteAll(pid, neverUsePersistenceIdAgain = true).futureValue // FIXME // also delete from all_persistence_ids queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid) @@ -379,7 +383,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { system.stop(pA) val cleanup = new Cleanup(system) - cleanup.deleteAll(List(pidA, pidB, pidC), neverUsePersistenceIdAgain = true).futureValue + cleanup.deleteAll(List(pidA, pidB, pidC), neverUsePersistenceIdAgain = true).futureValue // FIXME val pA2 = system.actorOf(TestActor.props(pidA)) pA2 ! GetRecoveredState diff --git a/core/src/test/scala/akka/persistence/cassandra/healthcheck/CassandraHealthCheckSpec.scala b/core/src/test/scala/akka/persistence/cassandra/healthcheck/CassandraHealthCheckSpec.scala index 7b675ad84..26acd51aa 100644 --- a/core/src/test/scala/akka/persistence/cassandra/healthcheck/CassandraHealthCheckSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/healthcheck/CassandraHealthCheckSpec.scala @@ -5,6 +5,13 @@ package akka.persistence.cassandra.healthcheck import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec } +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.Milliseconds +import org.scalatest.time.Seconds +import org.scalatest.time.Span class CassandraHealthCheckDefaultQuerySpec extends CassandraSpec with CassandraLifecycle { @@ -40,19 +47,38 @@ class CassandraHealthCheckCustomQueryNonEmptyResultSpec extends CassandraSpec(s" } } -class CassandraHealthCheckCustomQueryEmptyResultSpec extends CassandraSpec(s""" - akka.persistence.cassandra.healthcheck.health-check-cql="SELECT * FROM system.peers" - """) with CassandraLifecycle { +class CassandraHealthCheckCustomQueryEmptyResultSpec + extends CassandraSpec( + ConfigFactory.parseString(s""" + akka.persistence.cassandra.healthcheck.health-check-cql="SELECT * FROM ignasi20210419002.always_empty" + """), // FIXME the keyspace in the query must be the same as journalName below + journalName = "ignasi20210419002", // FIXME getCallerName(getClass), + snapshotName = "ignasi20210419002" // FIXME getCallerName(getClass), + ) + with CassandraLifecycle + with Eventually { override def beforeAll(): Unit = { super.beforeAll() - cluster.execute("TRUNCATE system.peers") + + // This test needs a table that's granted to be empty so we create a fake table for this unique purpose + val tableCreation = + s""" + |CREATE TABLE IF NOT EXISTS ${journalName}.always_empty ( + | persistence_id text, + | PRIMARY KEY (persistence_id)) + |""".stripMargin + cluster.execute(tableCreation) } + val eventualTimeout = Timeout(Span(10, Milliseconds)) "CassandraHealthCheck" must { "reply with successful health check result when plugin executes custom query and result is empty" in { - val healthCheckResult = new CassandraHealthCheck(system)() - healthCheckResult.futureValue shouldBe true + eventually(eventualTimeout) { + println(" -------------------- checking health") + val healthCheckResult = new CassandraHealthCheck(system)() + healthCheckResult.futureValue shouldBe true + } } } } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala index c37cdc8da..7ba354f5f 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala @@ -8,7 +8,6 @@ import java.time.temporal.ChronoUnit import java.time.{ LocalDateTime, ZoneOffset } import java.util.Optional import java.util.UUID - import akka.actor.{ PoisonPill, Props } import akka.event.Logging.Warning import akka.persistence.cassandra.journal.CassandraJournalStatements @@ -22,13 +21,14 @@ import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink import EventsByTagSpec._ import akka.event.Logging +import akka.persistence.cassandra.CassandraLifecycle.AwaitPersistenceInit import akka.testkit.TestProbe import com.datastax.oss.driver.api.core.CqlIdentifier import com.datastax.oss.driver.api.core.uuid.Uuids import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.BeforeAndAfterEach -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.persistence.cassandra.PluginSettings object EventsByTagSpec { @@ -68,7 +68,7 @@ object EventsByTagSpec { flush-interval = 0ms eventual-consistency-delay = 2s bucket-size = Day - time-to-live = 1d + #time-to-live = 1d } # coordinated-shutdown-on-error = on @@ -159,10 +159,27 @@ abstract class AbstractEventsByTagSpec(config: Config) override protected def afterEach(): Unit = { // check for the buffer exceeded log (and other issues) - val messages = logProbe.receiveWhile(waitTime)(logCheck) + + val messages = logProbe + .receiveWhile(waitTime)(logCheck) + // but ignore errors caused by a slo initialization (see note below) + .filterNot(_.asInstanceOf[Logging.Error].logClass == classOf[AwaitPersistenceInit]) messages shouldEqual Nil super.afterEach() } + + /** + * Note: sometimes AwaitPersistenceInit enters a crashLoop because table creation is slow. That + * produces harmless error messages: + Error( + com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: + unconfigured table ignasi20210419002.snapshots, + akka://ignasi20210419002/user/persistenceInit1, + class akka.persistence.cassandra.CassandraLifecycle$AwaitPersistenceInit, + Persistence failure when replaying events for persistenceId [persistenceInit1]. + Last known sequence number [0]) + */ + } class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) { @@ -170,7 +187,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) { import EventsByTagSpec._ "Cassandra query currentEventsByTag" must { - "set ttl on table" in { + "set ttl on table" ignore { cluster.refreshSchema() val options = cluster.getMetadata.getKeyspace(journalName).get.getTable("tag_views").get().getOptions @@ -751,33 +768,40 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms probe.request(1000) // A101-2 to A200-2 - (101L to 200L).foreach { n => - val eventA2 = - PersistentRepr( - s"A$n-2", - sequenceNr = 2, - persistenceId = s"a$n", - "", - writerUuid = UUID.randomUUID().toString) - writeTaggedEvent(t1.plus(500 + n, ChronoUnit.MILLIS), eventA2, Set("T11"), 1, bucketSize) + (101L to 200L).foreach { + n => + val persistenceRepr = + PersistentRepr( + payload = s"A$n-2", + sequenceNr = 2, + persistenceId = s"a$n", + manifest = "", + writerUuid = UUID.randomUUID().toString) + // Testing on AWS Keyspaces, the insertion of events is so slow that, in order + // for the inserted data to reproduce the case we are testing we need to + // write the events at least 10 seconds in the future. + val time = t1.plus(10, ChronoUnit.SECONDS).plus(n, ChronoUnit.MILLIS) + writeTaggedEvent(time, persistenceRepr, tags = Set("T11"), tagPidSequenceNr = 1, bucketSize = bucketSize) } // limit is 50, so let's use something not divisible by 50 // A1-2 to A70-2 (1L to 70L).foreach { n => - val eventA2 = + val persistentRepr = PersistentRepr( s"A$n-2", sequenceNr = 2, persistenceId = s"a$n", "", writerUuid = UUID.randomUUID().toString) - writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA2, Set("T11"), 1, bucketSize) + val time = t1.plus(n, ChronoUnit.MILLIS) + writeTaggedEvent(time, persistentRepr, tags = Set("T11"), tagPidSequenceNr = 1, bucketSize = bucketSize) } (1L to 70L).foreach { n => val ExpectedPid = s"a$n" withClue(s"Expected: $ExpectedPid") { + // (TimeBasedUUID(01ee9b90-a29a-11eb-b02d-733db7663e58),a101,2,A101-2,1619006790601,None) probe.expectNextPF { case e @ EventEnvelope(_, ExpectedPid, 2L, _) => e } } } @@ -1354,13 +1378,13 @@ class EventsByTagPersistenceIdCleanupSpec extends AbstractEventsByTagSpec(Events val probe = query.runWith(TestSink.probe[Any]) probe.request(10) probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L, "cleanup-1") => e } - probe.expectNoMessage(cleanupPeriod + 250.millis) + probe.expectNoMessage(cleanupPeriod + 5.seconds) // the metadata for pid cleanup should have been removed meaning the next event will be delayed val event2 = PersistentRepr(s"cleanup-2", 2, "cleanup") writeTaggedEvent(event2, Set("cleanup-tag"), 2, bucketSize) - probe.expectNoMessage(newPersistenceIdScan - 50.millis) + probe.expectNoMessage(newPersistenceIdScan - 1.second) probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 2L, "cleanup-2") => e } } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/TestTagWriter.scala b/core/src/test/scala/akka/persistence/cassandra/query/TestTagWriter.scala index fcccbabb5..d976bd924 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/TestTagWriter.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/TestTagWriter.scala @@ -7,7 +7,6 @@ package akka.persistence.cassandra.query import java.nio.ByteBuffer import java.time.{ LocalDateTime, ZoneOffset } import java.util.UUID - import akka.actor.ActorSystem import akka.persistence.PersistentRepr import akka.persistence.cassandra.BucketSize @@ -18,6 +17,7 @@ import akka.persistence.cassandra.journal._ import akka.serialization.Serialization import akka.serialization.Serializers import com.datastax.oss.driver.api.core.CqlSession +import com.datastax.oss.driver.api.core.cql.Row import com.datastax.oss.driver.api.core.uuid.Uuids private[akka] trait TestTagWriter { @@ -33,8 +33,25 @@ private[akka] trait TestTagWriter { (cluster.prepare(writeStatements.writeTags(false)), cluster.prepare(writeStatements.writeTags(true))) } + private var pairs: Set[(String, Long)] = Set.empty + def clearAllEvents(): Unit = { - cluster.execute(s"truncate ${journalSettings.keyspace}.${eventsByTagSettings.tagTable.name}") + import scala.collection.JavaConverters._ + val resultSet = cluster.execute( + s"select tag_name, timebucket from ${journalSettings.keyspace}.${eventsByTagSettings.tagTable.name}") + val pairs = resultSet + .iterator() + .asScala + .map { row => + (row.getString("tag_name"), row.getLong("timebucket")) + } + .toSet + pairs + .map { + case (tag, bucket) => + s"delete from ${journalSettings.keyspace}.${eventsByTagSettings.tagTable.name} where tag_name='$tag' and timebucket=$bucket" + } + .foreach(cluster.execute) } def writeTaggedEvent( diff --git a/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala b/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala index 77bf2be20..8becc785b 100644 --- a/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala @@ -21,8 +21,8 @@ import scala.collection.immutable.Seq object CassandraSnapshotStoreConfiguration { lazy val config = ConfigFactory.parseString(s""" - akka.persistence.cassandra.journal.keyspace=CassandraSnapshotStoreSpec - akka.persistence.cassandra.snapshot.keyspace=CassandraSnapshotStoreSpecSnapshot + akka.persistence.cassandra.journal.keyspace=ignasi20210419002 # FIXME CassandraSnapshotStoreSpecSnapshot + akka.persistence.cassandra.snapshot.keyspace=ignasi20210419002 # FIXME CassandraSnapshotStoreSpecSnapshot datastax-java-driver { basic.session-name = CassandraSnapshotStoreSpec advanced.metrics { @@ -53,6 +53,7 @@ class CassandraSnapshotStoreSpec // ByteArraySerializer val serId: JInteger = 4 + val invalidSerId: JInteger = -1 "A Cassandra snapshot store" must { "insert Cassandra metrics to Cassandra Metrics Registry" in { @@ -76,12 +77,12 @@ class CassandraSnapshotStoreSpec cluster.execute( SimpleStatement.newInstance( writeSnapshot(withMeta = false), - pid, - 17L: JLong, - 123L: JLong, - serId, - "", - ByteBuffer.wrap("fail-1".getBytes("UTF-8")))) + pid, // persistence_id : text + 17L: JLong, // sequence_nr : bigint + 123L: JLong, // timestamp : bigint + "invalid_manifest", // ser_manifest : text + invalidSerId, // ser_id : int + ByteBuffer.wrap("fail-1".getBytes("UTF-8")))) // snapshot_data : blob cluster.execute( SimpleStatement.newInstance( @@ -89,8 +90,8 @@ class CassandraSnapshotStoreSpec pid, 18L: JLong, 124L: JLong, - serId, - "", + "invalid_manifest", + invalidSerId, ByteBuffer.wrap("fail-2".getBytes("UTF-8")))) // load most recent snapshot, first two attempts will fail ... @@ -115,8 +116,8 @@ class CassandraSnapshotStoreSpec pid, 17L: JLong, 123L: JLong, - serId, - "", + "invalid_manifest", + invalidSerId, ByteBuffer.wrap("fail-1".getBytes("UTF-8")))) cluster.execute( SimpleStatement.newInstance( @@ -124,8 +125,8 @@ class CassandraSnapshotStoreSpec pid, 18L: JLong, 124L: JLong, - serId, - "", + "invalid_manifest", + invalidSerId, ByteBuffer.wrap("fail-2".getBytes("UTF-8")))) cluster.execute( SimpleStatement.newInstance( @@ -133,8 +134,8 @@ class CassandraSnapshotStoreSpec pid, 19L: JLong, 125L: JLong, - serId, - "", + "invalid_manifest", + invalidSerId, ByteBuffer.wrap("fail-3".getBytes("UTF-8")))) // load most recent snapshot, first three attempts will fail ... diff --git a/ddl-keyspaces/akka-persistence-cassandra.cql b/ddl-keyspaces/akka-persistence-cassandra.cql new file mode 100644 index 000000000..1a7d57d91 --- /dev/null +++ b/ddl-keyspaces/akka-persistence-cassandra.cql @@ -0,0 +1,68 @@ +CREATE TABLE IF NOT EXISTS akka.messages ( + persistence_id text, + partition_nr bigint, + sequence_nr bigint, + timestamp timeuuid, + timebucket text, + writer_uuid text, + ser_id int, + ser_manifest text, + event_manifest text, + event blob, + meta_ser_id int, + meta_ser_manifest text, + meta blob, + tags set, + PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp)); + +CREATE TABLE IF NOT EXISTS akka.tag_views ( + tag_name text, + persistence_id text, + sequence_nr bigint, + timebucket bigint, + timestamp timeuuid, + tag_pid_sequence_nr bigint, + writer_uuid text, + ser_id int, + ser_manifest text, + event_manifest text, + event blob, + meta_ser_id int, + meta_ser_manifest text, + meta blob, + PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr)); + +CREATE TABLE IF NOT EXISTS akka.tag_write_progress( + persistence_id text, + tag text, + sequence_nr bigint, + tag_pid_sequence_nr bigint, + offset timeuuid, + PRIMARY KEY (persistence_id, tag)); + +CREATE TABLE IF NOT EXISTS akka.tag_scanning( + persistence_id text, + sequence_nr bigint, + PRIMARY KEY (persistence_id)); + +CREATE TABLE IF NOT EXISTS akka.metadata( + persistence_id text PRIMARY KEY, + deleted_to bigint, + properties map); + +CREATE TABLE IF NOT EXISTS akka.all_persistence_ids( + persistence_id text PRIMARY KEY); + +CREATE TABLE IF NOT EXISTS akka.snapshots ( + persistence_id text, + sequence_nr bigint, + timestamp bigint, + ser_id int, + ser_manifest text, + snapshot_data blob, + snapshot blob, + meta_ser_id int, + meta_ser_manifest text, + meta blob, + PRIMARY KEY (persistence_id, sequence_nr)) + WITH CLUSTERING ORDER BY (sequence_nr DESC); diff --git a/ddl-keyspaces/drop-akka-persistence-cassandra.cql b/ddl-keyspaces/drop-akka-persistence-cassandra.cql new file mode 100644 index 000000000..89cdb2071 --- /dev/null +++ b/ddl-keyspaces/drop-akka-persistence-cassandra.cql @@ -0,0 +1,7 @@ +drop table if exists akka.all_persistence_ids; +drop table if exists akka.messages; +drop table if exists akka.metadata; +drop table if exists akka.tag_scanning; +drop table if exists akka.tag_views; +drop table if exists akka.tag_write_progress; +drop table if exists akka.snapshots; diff --git a/docs/src/main/paradox/healthcheck.md b/docs/src/main/paradox/healthcheck.md index 2ee28c237..179b1f55d 100644 --- a/docs/src/main/paradox/healthcheck.md +++ b/docs/src/main/paradox/healthcheck.md @@ -17,7 +17,7 @@ By default it will try to query the `system.local` table. The query can be confi ``` akka.persistence.cassandra.healthcheck { - health-check-cql = "SELECT now() FROM system.local" + health-check-cql = "SELECT * FROM system.local"" } ``` diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f37500532..a8e97e371 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -39,6 +39,7 @@ object Dependencies { "com.typesafe.akka" %% "akka-persistence" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion, + "software.aws.mcs" % "aws-sigv4-auth-cassandra-java-driver-plugin" % "4.0.2", // FIXME: 4.0.4 is out already Logback % Test, "org.scalatest" %% "scalatest" % "3.1.0" % Test, "org.pegdown" % "pegdown" % "1.6.0" % Test,