Skip to content

Commit

Permalink
Bump skafka and kafka-journal libs, drop deprecates (#639)
Browse files Browse the repository at this point in the history
* Bump skafka and kafka-journal libs, drop deprecates

* drop other deprecated methods

* use skafka's KafkaHealthCheck

---------

Co-authored-by: Nikita Filimonov <[email protected]>
  • Loading branch information
Phill101 and Nikita Filimonov authored Oct 8, 2024
1 parent 9049272 commit b5dafad
Show file tree
Hide file tree
Showing 20 changed files with 61 additions and 473 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@ package com.evolutiongaming.kafka.flow.kafka

import cats.effect.{Async, Clock, Resource}
import cats.syntax.all._
import com.evolutiongaming.catshelper.{FromTry, LogOf, MeasureDuration, ToFuture, ToTry}
import com.evolutiongaming.catshelper._
import com.evolutiongaming.kafka.flow.LogResource
import com.evolutiongaming.kafka.flow.kafka.Codecs._
import com.evolutiongaming.kafka.journal.{
KafkaConfig,
KafkaConsumerOf => JournalConsumerOf,
KafkaHealthCheck,
KafkaProducerOf => JournalProducerOf,
RandomIdOf
}
import com.evolutiongaming.skafka.KafkaHealthCheck
import com.evolutiongaming.skafka.consumer.{
AutoOffsetReset,
ConsumerConfig,
Expand All @@ -28,8 +22,8 @@ trait KafkaModule[F[_]] {

def consumerOf: ConsumerOf[F]
def producerOf: RawProducerOf[F]

}

object KafkaModule {

def of[F[_]: Async: FromTry: ToTry: ToFuture: LogOf](
Expand All @@ -43,17 +37,18 @@ object KafkaModule {
consumerMetrics <- ConsumerMetrics.of(registry)
_producerOf = RawProducerOf.apply1[F](producerMetrics(applicationId).some)
_consumerOf = RawConsumerOf.apply1[F](consumerMetrics(applicationId).some)

_healthCheck <- {
implicit val randomIdOf = RandomIdOf.uuid[F]
implicit val journalProducerOf = JournalProducerOf[F](_producerOf)
implicit val journalConsumerOf = JournalConsumerOf[F](_consumerOf)
implicit val randomIdOf = RandomIdOf.uuid[F]
implicit val consumerOf = _consumerOf
implicit val producerOf = _producerOf

val commonConfig = config.common.copy(clientId = config.common.clientId.map(id => s"$id-HealthCheck"))
val healthCheck = KafkaHealthCheck.of[F](
config = KafkaHealthCheck.Config.default,
kafkaConfig = KafkaConfig(
ProducerConfig(common = commonConfig, saslSupport = config.saslSupport, sslSupport = config.sslSupport),
config.copy(common = commonConfig)
)

val healthCheck = KafkaHealthCheck.of(
KafkaHealthCheck.Config.default,
ConsumerConfig(common = commonConfig),
ProducerConfig(common = commonConfig, saslSupport = config.saslSupport, sslSupport = config.sslSupport)
)
LogResource[F](KafkaModule.getClass, "KafkaHealthCheck") *> healthCheck
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@ import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.evolution.kafka.flow.cassandra.CassandraModule
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow
import com.evolutiongaming.kafka.flow.cassandra.CassandraConfig
import com.evolutiongaming.nel.Nel
import com.evolutiongaming.scassandra
import munit.FunSuite

import java.util.concurrent.atomic.AtomicReference
import scala.annotation.nowarn

abstract class CassandraSpec extends FunSuite {
implicit val ioRuntime: IORuntime = IORuntime.global

override def munitFixtures: Seq[Fixture[_]] = List(cassandra, cassandraJournal)
override def munitFixtures: Seq[Fixture[_]] = List(cassandra)

val cassandra: Fixture[CassandraModule[IO]] = new Fixture[CassandraModule[IO]]("CassandraModule") {
private val moduleRef = new AtomicReference[(CassandraModule[IO], IO[Unit])]()
Expand Down Expand Up @@ -44,35 +42,4 @@ abstract class CassandraSpec extends FunSuite {
Option(moduleRef.get()).foreach { case (_, finalizer) => finalizer.unsafeRunSync() }
}
}

@nowarn("msg=deprecated")
val cassandraJournal: Fixture[flow.cassandra.CassandraModule[IO]] =
new Fixture[flow.cassandra.CassandraModule[IO]]("CassandraModule") {
private val moduleRef = new AtomicReference[(flow.cassandra.CassandraModule[IO], IO[Unit])]()

override def apply(): flow.cassandra.CassandraModule[IO] = moduleRef.get()._1

override def beforeAll(): Unit = {
implicit val logOf = LogOf.slf4j[IO].unsafeRunSync()

val container = CassandraContainerResource.cassandra.cassandraContainer
val result: (flow.cassandra.CassandraModule[IO], IO[Unit]) =
flow
.cassandra
.CassandraModule
.of[IO](
CassandraConfig(client =
scassandra.CassandraConfig(contactPoints = Nel(container.getHost), port = container.getFirstMappedPort)
)
)
.allocated
.unsafeRunSync()

moduleRef.set(result)
}

override def afterAll(): Unit = {
Option(moduleRef.get()).foreach { case (_, finalizer) => finalizer.unsafeRunSync() }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.effect.IO
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.scassandra.CassandraSession

import scala.annotation.nowarn
import scala.concurrent.duration._

class JournalSchemaSpec extends CassandraSpec {
Expand All @@ -23,20 +22,6 @@ class JournalSchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is created using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync
@nowarn("msg=deprecated")
val schema = JournalSchema.apply(session, sync)

val test = for {
_ <- schema.create
_ <- validateTableExists(session.unsafe)
} yield ()

test.unsafeRunSync()
}

test("table is truncated using scassandra session API") {
val session = cassandra().session
val sync = cassandra().sync
Expand All @@ -53,23 +38,6 @@ class JournalSchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is truncated using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync

@nowarn("msg=deprecated")
val schema = JournalSchema.apply(session, sync)

val test = for {
_ <- schema.create
_ <- insertRecord(session.unsafe)
_ <- schema.truncate
_ <- validateTableIsEmpty(session.unsafe)
} yield ()

test.unsafeRunSync()
}

private def insertRecord(session: CassandraSession[IO]): IO[Unit] = {
session
.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.effect.IO
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.scassandra.CassandraSession

import scala.annotation.nowarn
import scala.concurrent.duration._

class KeySchemaSpec extends CassandraSpec {
Expand All @@ -24,21 +23,6 @@ class KeySchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is created using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync

@nowarn("msg=deprecated")
val keySchema = KeySchema.apply(session, sync)

val test = for {
_ <- keySchema.create
_ <- validateTableExists(session.unsafe)
} yield ()

test.unsafeRunSync()
}

test("table is truncated using scassandra session API") {
val session = cassandra().session
val sync = cassandra().sync
Expand All @@ -55,23 +39,6 @@ class KeySchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is truncated using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync

@nowarn("msg=deprecated")
val keySchema = KeySchema.apply(session, sync)

val test = for {
_ <- keySchema.create
_ <- insertKey(session.unsafe)
_ <- keySchema.truncate
_ <- validateTableIsEmpty(session.unsafe)
} yield ()

test.unsafeRunSync()
}

private def insertKey(session: CassandraSession[IO]): IO[Unit] = {
session
.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.scassandra.CassandraSession

import scala.concurrent.duration._
import scala.annotation.nowarn

class SnapshotSchemaSpec extends CassandraSpec {
override def munitTimeout: Duration = 2.minutes
Expand All @@ -23,20 +22,6 @@ class SnapshotSchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is created using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync
@nowarn("msg=deprecated")
val schema = SnapshotSchema.apply(session, sync)

val test = for {
_ <- schema.create
_ <- validateTableExists(session.unsafe)
} yield ()

test.unsafeRunSync()
}

test("table is truncated using scassandra session API") {
val session = cassandra().session
val sync = cassandra().sync
Expand All @@ -53,23 +38,6 @@ class SnapshotSchemaSpec extends CassandraSpec {
test.unsafeRunSync()
}

test("table is truncated using kafka-journal session API") {
val session = cassandraJournal().session
val sync = cassandraJournal().sync

@nowarn("msg=deprecated")
val schema = SnapshotSchema.apply(session, sync)

val test = for {
_ <- schema.create
_ <- insertSnapshot(session.unsafe)
_ <- schema.truncate
_ <- validateTableIsEmpty(session.unsafe)
} yield ()

test.unsafeRunSync()
}

private def insertSnapshot(session: CassandraSession[IO]): IO[Unit] = {
session
.execute(
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit b5dafad

Please sign in to comment.