Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testing with AWS Keyspaces #866

Closed
wants to merge 10 commits into from
8 changes: 4 additions & 4 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ akka.persistence.cassandra {

# Maximum number of messages that will be batched when using `persistAsync`.
# Also used as the max batch size for deletes.
max-message-batch-size = 100
max-message-batch-size = 30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batches with more than 30 statements are not supported


# Target number of entries per partition (= columns per row).
# Must not be changed after table creation (currently not checked).
Expand Down Expand Up @@ -247,7 +247,7 @@ akka.persistence.cassandra {
# Max size of these batches. The best value for this will depend on the size of
# the serialized events. Cassandra logs a warning for batches above a certain
# size and this should be reduced if that warning is seen.
max-message-batch-size = 150
max-message-batch-size = 30

# Max time to buffer events for before writing.
# Larger values will increase cassandra write efficiency but increase the delay before
Expand Down Expand Up @@ -591,14 +591,14 @@ datastax-java-driver {
profiles {
akka-persistence-cassandra-profile {
basic.request {
consistency = QUORUM
consistency = LOCAL_QUORUM
# the journal does not use any counters or collections
default-idempotence = true
}
}
akka-persistence-cassandra-snapshot-profile {
basic.request {
consistency = ONE
consistency = LOCAL_QUORUM
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to use LOCAL_ONE for the reads but fails so might be required for the writes.

# the snapshot store does not use any counters or collections
default-idempotence = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ import akka.persistence.cassandra.FutureDone

val done = if (journalSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
// session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
Expand All @@ -361,13 +361,13 @@ import akka.persistence.cassandra.FutureDone
}
_ <- tagStatements
} yield {
session.setSchemaMetadataEnabled(null)
// session.setSchemaMetadataEnabled(null)
Done
}
result.recoverWith {
case e =>
log.warning("Failed to create journal keyspace and tables: {}", e)
session.setSchemaMetadataEnabled(null)
// session.setSchemaMetadataEnabled(null)
FutureDone
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.typesafe.config.Config
val keyspaceAutoCreate: Boolean = journalConfig.getBoolean("keyspace-autocreate")
val tablesAutoCreate: Boolean = journalConfig.getBoolean("tables-autocreate")

val keyspace: String = journalConfig.getString("keyspace")
val keyspace: String = "akka" // FIXME journalConfig.getString("keyspace")

val table: String = journalConfig.getString("table")
val metadataTable: String = journalConfig.getString("metadata-table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ import akka.persistence.cassandra.FutureDone

if (snapshotSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
// session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
} yield {
session.setSchemaMetadataEnabled(null)
// session.setSchemaMetadataEnabled(null)
Done
}
result.recoverWith {
case e =>
log.warning("Failed to create snapshot keyspace and tables: {}", e)
session.setSchemaMetadataEnabled(null)
// session.setSchemaMetadataEnabled(null)
FutureDone
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.typesafe.config.Config
val keyspaceAutoCreate: Boolean = snapshotConfig.getBoolean("keyspace-autocreate")
val tablesAutoCreate: Boolean = snapshotConfig.getBoolean("tables-autocreate")

val keyspace: String = snapshotConfig.getString("keyspace")
val keyspace: String = "akka" // FIXME snapshotConfig.getString("keyspace")

val table: String = snapshotConfig.getString("table")

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
</encoder>
</appender>
<logger name="org.apache.cassandra" level="ERROR" />
<logger name="com.datastax" level="INFO" />
<logger name="io.netty" level="ERROR" />
<logger name="com.datastax" level="DEBUG" />
<logger name="io.netty" level="DEBUG" />
<root level="INFO">
<appender-ref ref="CONSOLE" />
</root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.ConfigFactory
import org.scalatest._
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
Expand All @@ -37,6 +38,8 @@ object CassandraLifecycle {
akka.persistence.journal.plugin = "akka.persistence.cassandra.journal"
akka.persistence.snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
akka.persistence.cassandra.journal.circuit-breaker.call-timeout = 30s
akka.persistence.cassandra.journal.max-failures = 20
akka.persistence.cassandra.snapshot.max-failures = 20
akka.persistence.cassandra.events-by-tag.first-time-bucket = "$firstTimeBucket"
akka.test.single-expect-default = 20s
akka.test.filter-leeway = 20s
Expand All @@ -45,6 +48,25 @@ object CassandraLifecycle {
akka.actor.allow-java-serialization = on
akka.actor.warn-about-java-serializer-usage = off
akka.use-slf4j = off

datastax-java-driver {
basic.contact-points = [ "cassandra.eu-central-1.amazonaws.com:9142"]
basic.request.consistency = LOCAL_QUORUM
basic.load-balancing-policy {
class = DefaultLoadBalancingPolicy
local-datacenter = eu-central-1
}
advanced {
auth-provider = {
class = software.aws.mcs.auth.SigV4AuthProvider
aws-region = eu-central-1
}
ssl-engine-factory {
class = DefaultSslEngineFactory
}
}
}

""").withFallback(CassandraSpec.enableAutocreate).resolve()

def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = {
Expand Down Expand Up @@ -112,27 +134,48 @@ trait CassandraLifecycle extends BeforeAndAfterAll with TestKitBase {
}

override protected def beforeAll(): Unit = {
awaitPersistenceInit()
try {
awaitPersistenceInit()
} catch {
case NonFatal(e) =>
// Try(externalCassandraCleanup())
shutdown(system, verifySystemShutdown = false)
throw e
}
super.beforeAll()
}

private var initOk = false

def awaitPersistenceInit(): Unit = {
CassandraLifecycle.awaitPersistenceInit(system)
initOk = true
}

override protected def afterAll(): Unit = {
externalCassandraCleanup()
if (initOk)
externalCassandraCleanup()
shutdown(system, verifySystemShutdown = true)
super.afterAll()
}

def dropKeyspaces(): Unit = {
val journalKeyspace = system.settings.config.getString("akka.persistence.cassandra.journal.keyspace")
val snapshotKeyspace = system.settings.config.getString("akka.persistence.cassandra.snapshot.keyspace")
val journalKeyspace = "akka" // FIXME system.settings.config.getString("akka.persistence.cassandra.journal.keyspace")
val snapshotKeyspace = "akka" // FIXME system.settings.config.getString("akka.persistence.cassandra.snapshot.keyspace")
val dropped = Try {
cluster.execute(s"drop keyspace if exists ${journalKeyspace}")
cluster.execute(s"drop keyspace if exists ${snapshotKeyspace}")
// cluster.execute(s"drop table if exists ${journalKeyspace}.all_persistence_ids")
// cluster.execute(s"drop table if exists ${journalKeyspace}.messages")
// cluster.execute(s"drop table if exists ${journalKeyspace}.metadata")
// cluster.execute(s"drop table if exists ${journalKeyspace}.tag_scanning")
// cluster.execute(s"drop table if exists ${journalKeyspace}.tag_views")
// cluster.execute(s"drop table if exists ${journalKeyspace}.tag_write_progress")
//
// cluster.execute(s"drop table if exists ${snapshotKeyspace}.snapshots")

//cluster.execute(s"drop keyspace if exists ${journalKeyspace}")
//cluster.execute(s"drop keyspace if exists ${snapshotKeyspace}")
}

dropped match {
case Failure(t) => system.log.error(t, "Failed to drop keyspaces {} {}", journalKeyspace, snapshotKeyspace)
case Success(_) =>
Expand Down
21 changes: 11 additions & 10 deletions core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ object CassandraSpec {
eventual-consistency-delay = 200ms
}
snapshot {
keyspace-autocreate = true
tables-autocreate = true
keyspace-autocreate = false
tables-autocreate = false
}
journal {
keyspace-autocreate = true
tables-autocreate = true
keyspace-autocreate = false
tables-autocreate = false
}
}
""")

val fallbackConfig = ConfigFactory.parseString(s"""
akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"]
#akka.loggers = ["akka.persistence.cassandra.SilenceAllTestEventListener"]
akka.loglevel = DEBUG
akka.use-slf4j = off

Expand All @@ -92,8 +92,8 @@ object CassandraSpec {
*/
abstract class CassandraSpec(
config: Config = CassandraLifecycle.config,
val journalName: String = getCallerName(getClass),
val snapshotName: String = getCallerName(getClass),
val journalName: String = "akka", // FIXME getCallerName(getClass),
val snapshotName: String = "akka", // FIXME getCallerName(getClass),
dumpRowsOnFailure: Boolean = true)
extends TestKitBase
with Suite
Expand Down Expand Up @@ -203,9 +203,10 @@ abstract class CassandraSpec(
})

}
keyspaces().foreach { keyspace =>
cluster.execute(s"drop keyspace if exists $keyspace")
}
// FIXME
// keyspaces().foreach { keyspace =>
// cluster.execute(s"drop keyspace if exists $keyspace")
// }
} catch {
case NonFatal(t) =>
println("Exception during cleanup")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ object EventsByTagMigrationSpec {
}
akka.persistence.cassandra {
journal {
keyspace-autocreate = true
tables-autocreate = true
keyspace-autocreate = false
tables-autocreate = false
}
query {
events-by-persistence-id-gap-timeout = 1s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object EventsByTagSpec {
flush-interval = 0ms
eventual-consistency-delay = 2s
bucket-size = Day
time-to-live = 1d
#time-to-live = 1d
}

# coordinated-shutdown-on-error = on
Expand Down Expand Up @@ -170,7 +170,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
import EventsByTagSpec._

"Cassandra query currentEventsByTag" must {
"set ttl on table" in {
"set ttl on table" ignore {
cluster.refreshSchema()
val options = cluster.getMetadata.getKeyspace(journalName).get.getTable("tag_views").get().getOptions

Expand Down
68 changes: 68 additions & 0 deletions ddl-keyspaces/akka-persistence-cassandra.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
CREATE TABLE IF NOT EXISTS akka.messages (
persistence_id text,
partition_nr bigint,
sequence_nr bigint,
timestamp timeuuid,
timebucket text,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
tags set<text>,
PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp));

CREATE TABLE IF NOT EXISTS akka.tag_views (
tag_name text,
persistence_id text,
sequence_nr bigint,
timebucket bigint,
timestamp timeuuid,
tag_pid_sequence_nr bigint,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr));

CREATE TABLE IF NOT EXISTS akka.tag_write_progress(
persistence_id text,
tag text,
sequence_nr bigint,
tag_pid_sequence_nr bigint,
offset timeuuid,
PRIMARY KEY (persistence_id, tag));

CREATE TABLE IF NOT EXISTS akka.tag_scanning(
persistence_id text,
sequence_nr bigint,
PRIMARY KEY (persistence_id));

CREATE TABLE IF NOT EXISTS akka.metadata(
persistence_id text PRIMARY KEY,
deleted_to bigint,
properties map<text,text>);

CREATE TABLE IF NOT EXISTS akka.all_persistence_ids(
persistence_id text PRIMARY KEY);

CREATE TABLE IF NOT EXISTS akka.snapshots (
persistence_id text,
sequence_nr bigint,
timestamp bigint,
ser_id int,
ser_manifest text,
snapshot_data blob,
snapshot blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
PRIMARY KEY (persistence_id, sequence_nr))
WITH CLUSTERING ORDER BY (sequence_nr DESC);
7 changes: 7 additions & 0 deletions ddl-keyspaces/drop-akka-persistence-cassandra.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
drop table if exists akka.all_persistence_ids;
drop table if exists akka.messages;
drop table if exists akka.metadata;
drop table if exists akka.tag_scanning;
drop table if exists akka.tag_views;
drop table if exists akka.tag_write_progress;
drop table if exists akka.snapshots;
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object Dependencies {
"com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
"software.aws.mcs" % "aws-sigv4-auth-cassandra-java-driver-plugin" % "4.0.2",
Logback % Test,
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"org.pegdown" % "pegdown" % "1.6.0" % Test,
Expand Down