Skip to content

Commit

Permalink
Update woken-messages to 2.9.11, use DatabaseId and TableId
Browse files Browse the repository at this point in the history
Use DatabaseId and TableId in most places,
add unit tests for DatabaseConfiguration and DatasetsConfiguration
  • Loading branch information
ludovicc committed Feb 13, 2019
1 parent c11723e commit 70240dc
Show file tree
Hide file tree
Showing 57 changed files with 709 additions and 520 deletions.
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ lazy val `woken` =
library.kamonSigar,
library.akkaHttpSwagger,
library.swaggerScala,
library.swaggerJaxrs,
library.swaggerUI,
library.javaxWsRs,
library.sprayJson,
library.slf4j,
library.log4jSlf4j,
Expand Down Expand Up @@ -104,6 +104,8 @@ lazy val library =
val akkaManagement = "0.20.0"
val akkaHttpSwagger = "2.0.1"
val swaggerScala = "2.0.3"
val swaggerUI = "3.20.5"
val javaxWsRs = "2.0.1"
val kamon = "1.1.5"
val kamonAkka = "1.1.3"
val kamonAkkaRemote = "1.1.0"
Expand All @@ -113,11 +115,9 @@ lazy val library =
val kamonSystemMetrics = "1.0.1"
val kamonSigar = "1.6.6-rev002"
val bugsnag = "3.4.1"
val swaggerJaxrs = "1.5.21"
val swaggerUI = "3.20.5"
val sprayJson = "1.3.5"
val slf4j = "1.7.25"
val log4j = "2.11.1"
val log4j = "2.11.2"
val disruptor = "3.4.2"
val scalaLogging = "3.9.0"
val cats = "1.6.0"
Expand All @@ -132,7 +132,7 @@ lazy val library =
val dockerTestKit = "0.9.8"
val diff = "2.0.1"
val acyclic = "0.1.8"
val wokenMessages = "2.9.6"
val wokenMessages = "2.9.11"
val sup = "0.2.0"
val sttpBackend = "1.5.9"
}
Expand Down Expand Up @@ -163,6 +163,8 @@ lazy val library =
val akkaManagementClusterHttp: ModuleID = "com.lightbend.akka.management" %% "akka-management-cluster-http" % Version.akkaManagement excludeAll ExclusionRules.excludeAkkaClusterSharding
val akkaHttpSwagger: ModuleID = "com.github.swagger-akka-http" %% "swagger-akka-http" % Version.akkaHttpSwagger
val swaggerScala: ModuleID = "com.github.swagger-akka-http" %% "swagger-scala-module" % Version.swaggerScala
val swaggerUI: ModuleID = "org.webjars" % "swagger-ui" % Version.swaggerUI
val javaxWsRs: ModuleID = "javax.ws.rs" % "javax.ws.rs-api" % Version.javaxWsRs

// Kamon
val kamon: ModuleID = "io.kamon" %% "kamon-core" % Version.kamon excludeAll ExclusionRules.excludeLogback
Expand All @@ -175,8 +177,6 @@ lazy val library =
val kamonSigar: ModuleID = "io.kamon" % "sigar-loader" % Version.kamonSigar
val bugsnag: ModuleID = "com.bugsnag" % "bugsnag" % Version.bugsnag excludeAll ExclusionRules.excludeJackson

val swaggerJaxrs: ModuleID = "io.swagger" % "swagger-jaxrs" % Version.swaggerJaxrs
val swaggerUI: ModuleID = "org.webjars" % "swagger-ui" % Version.swaggerUI
val sprayJson: ModuleID = "io.spray" %% "spray-json" % Version.sprayJson
val slf4j: ModuleID = "org.slf4j" % "slf4j-api" % Version.slf4j
val log4jSlf4j: ModuleID = "org.apache.logging.log4j" % "log4j-slf4j-impl" % Version.log4j
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/ch/chuv/lren/woken/akka/AkkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import cats.effect._
import ch.chuv.lren.woken.backends.faas.chronos.ChronosExecutor
import ch.chuv.lren.woken.backends.woken.WokenClientService
import ch.chuv.lren.woken.backends.worker.WokenWorker
import ch.chuv.lren.woken.config.{ DatasetsConfiguration, WokenConfiguration }
import ch.chuv.lren.woken.config.WokenConfiguration
import ch.chuv.lren.woken.errors.BugsnagErrorReporter
import ch.chuv.lren.woken.service._
import com.typesafe.scalalogging.Logger
Expand Down Expand Up @@ -64,7 +64,7 @@ class AkkaServer[F[_]: ConcurrentEffect: ContextShift: Timer](
val backendServices: BackendServices[F] = {
val wokenService: WokenClientService = WokenClientService(config.jobs.node)
val dispatcherService: DispatcherService =
DispatcherService(DatasetsConfiguration.datasets(config.config), wokenService)
DispatcherService(databaseServices.datasetService, wokenService)
val wokenWorker = WokenWorker[F](pubSub, cluster)
val algorithmExecutor = ChronosExecutor(system,
chronosHttp,
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/ch/chuv/lren/woken/akka/MasterRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import ch.chuv.lren.woken.dispatch.{
MiningQueriesActor
}
import ch.chuv.lren.woken.messages.{ Ping, Pong }
import ch.chuv.lren.woken.messages.datasets.{ DatasetsQuery, DatasetsResponse }
import ch.chuv.lren.woken.messages.datasets.{ DatasetsQuery, DatasetsResponse, TableId }
import ch.chuv.lren.woken.messages.query._
import ch.chuv.lren.woken.messages.variables._
import ch.chuv.lren.woken.service._
Expand Down Expand Up @@ -76,10 +76,12 @@ class MasterRouter[F[_]: Effect](

case ds: DatasetsQuery =>
mark("DatasetsQueryRequestReceived")
val allDatasets = databaseServices.datasetService.datasets()
val table = ds.table.getOrElse(config.jobs.featuresTable)
val allDatasets = databaseServices.datasetService.datasets().values.toSet
val table: TableId = ds.table
.map(t => TableId(config.jobs.defaultFeaturesDatabase.code, t))
.getOrElse(config.jobs.defaultFeaturesTable)
val datasets =
if (table == "*") allDatasets
if (table.name == "*") allDatasets
else allDatasets.filter(_.tables.contains(table))

sender ! DatasetsResponse(datasets.map(_.withoutAuthenticationDetails))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import ch.chuv.lren.woken.backends.HttpClient.checkHealth
import ch.chuv.lren.woken.backends.faas.AlgorithmExecutor.TaggedS
import ch.chuv.lren.woken.backends.faas.{ AlgorithmExecutor, AlgorithmResults }
import ch.chuv.lren.woken.config.{ DatabaseConfiguration, JobsConfiguration }
import ch.chuv.lren.woken.config.{ DatabaseConfiguration, DatabaseId, JobsConfiguration }
import ch.chuv.lren.woken.core.model.jobs.{ DockerJob, ErrorJobResult }
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.core.fp._
Expand All @@ -48,7 +48,7 @@ case class ChronosExecutor[F[_]: Effect](system: ActorSystem,
dockerBridgeNetwork: Option[String],
jobResultService: JobResultRepository[F],
jobsConf: JobsConfiguration,
jdbcConfF: String => Validation[DatabaseConfiguration])
jdbcConfF: DatabaseId => Validation[DatabaseConfiguration])
extends AlgorithmExecutor[F]
with LazyLogging {

Expand Down Expand Up @@ -89,7 +89,7 @@ case class CoordinatorConfig[F[_]](chronosService: ActorRef,
dockerBridgeNetwork: Option[String],
jobResultService: JobResultRepository[F],
jobsConf: JobsConfiguration,
jdbcConfF: String => Validation[DatabaseConfiguration])
jdbcConfF: DatabaseId => Validation[DatabaseConfiguration])

/**
* We use the companion object to hold all the messages that the ``CoordinatorActor``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package ch.chuv.lren.woken.backends.faas.chronos

import ch.chuv.lren.woken.config.{ DatabaseConfiguration, JobsConfiguration }
import ch.chuv.lren.woken.config.{ DatabaseConfiguration, DatabaseId, JobsConfiguration }
import ch.chuv.lren.woken.core.model.jobs.DockerJob
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import cats.implicits._
Expand All @@ -44,7 +44,7 @@ object JobToChronos {
def apply(job: DockerJob,
dockerBridgeNetwork: Option[String],
jobsConf: JobsConfiguration,
jdbcConfF: String => Validation[DatabaseConfiguration]): Validation[ChronosJob] = {
jdbcConfF: DatabaseId => Validation[DatabaseConfiguration]): Validation[ChronosJob] = {

val container = dockerBridgeNetwork.fold(
Container(`type` = ContainerType.DOCKER, image = job.algorithmDefinition.dockerImage)
Expand Down Expand Up @@ -85,7 +85,7 @@ object JobToChronos {
)
}

val inputDb = jdbcConfF(job.query.dbTable.database)
val inputDb = jdbcConfF(DatabaseId(job.query.dbTable.database))
val outputDb = jdbcConfF(jobsConf.resultDb)

(inputDb, outputDb) mapN buildChronosJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ case class WokenClientService(node: String)(implicit val system: ActorSystem,
Unmarshal(response)
.to[QueryResult])
.mapN((_, _))
.map(rq => (rq._1, rq._2.copy(query = Some(query))))
.map(rq => (rq._1, rq._2.copy(query = query)))
case (url, query, failure) =>
failure.discardEntityBytes()
(url,
QueryResult(None,
QueryResult("",
node,
Set(),
Nil,
Expand All @@ -114,7 +114,7 @@ case class WokenClientService(node: String)(implicit val system: ActorSystem,
Some("dispatch"),
None,
Some(failure.entity.toString),
Some(query))).pure[Future]
query)).pure[Future]
}
.map(identity)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

package ch.chuv.lren.woken.config

import cats.data.NonEmptyList
import doobie._
import doobie.implicits._
import doobie.hikari._
import cats.implicits._
import cats.effect._
import cats.data.Validated._
import com.typesafe.config.Config
import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId }
import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn }
import ch.chuv.lren.woken.cromwell.core.ConfigUtil._
import ch.chuv.lren.woken.messages.datasets.TableId
import ch.chuv.lren.woken.messages.query.UserId
import ch.chuv.lren.woken.messages.variables.SqlType

import scala.language.higherKinds

final case class DatabaseId(code: String)

/**
* Connection configuration for a database
*
Expand All @@ -45,7 +49,8 @@ import scala.language.higherKinds
* @param user Database user
* @param password Database password
*/
final case class DatabaseConfiguration(dbiDriver: String,
final case class DatabaseConfiguration(id: DatabaseId,
dbiDriver: String,
dbApiDriver: String,
jdbcDriver: String,
jdbcUrl: String,
Expand All @@ -56,13 +61,14 @@ final case class DatabaseConfiguration(dbiDriver: String,
user: String,
password: String,
poolSize: Int,
tables: Set[FeaturesTableDescription])
tables: Map[TableId, FeaturesTableDescription])

object DatabaseConfiguration {

def read(config: Config, path: List[String]): Validation[DatabaseConfiguration] = {
def read(config: Config, path: NonEmptyList[String]): Validation[DatabaseConfiguration] = {

val dbConfig = config.validateConfig(path.mkString("."))
val dbConfig = config.validateConfig(path.mkString_("."))
val dbId = path.last.validNel[String].map(DatabaseId)

dbConfig.andThen { db =>
val dbiDriver: Validation[String] =
Expand All @@ -88,18 +94,21 @@ object DatabaseConfiguration {

val tableFactory: String => Validation[FeaturesTableDescription] =
table =>
liftOption(path.lastOption).andThen { tableName =>
readTable(db, List("tables", table), tableName)
dbId.andThen { id =>
readTable(db, List("tables", table), id.code)
}

val tables: Validation[Set[FeaturesTableDescription]] = {
val tables: Validation[Map[TableId, FeaturesTableDescription]] = {
tableNames.andThen { names: Set[String] =>
val m: Set[Validation[FeaturesTableDescription]] = names.map(tableFactory)
m.toList.sequence[Validation, FeaturesTableDescription].map(_.toSet)
m.toList
.sequence[Validation, FeaturesTableDescription]
.map(l => l.map(d => (d.table, d)).toMap)
}
}

(dbiDriver,
(dbId,
dbiDriver,
dbApiDriver,
jdbcDriver,
jdbcUrl,
Expand Down Expand Up @@ -149,7 +158,8 @@ object DatabaseConfiguration {
}
.orElse(None.validNel)

val schema: Validation[Option[String]] = table.validateOptionalString("schema")
val schema: Validation[String] =
table.validateOptionalString("schema").map(_.getOrElse("public"))
val validateSchema: Validation[Boolean] =
table.validateBoolean("validateSchema").orElse(true.validNel[String])
val seed: Validation[Double] = table.validateDouble("seed").orElse(0.67.validNel)
Expand All @@ -160,8 +170,8 @@ object DatabaseConfiguration {
}
}

def factory(config: Config): String => Validation[DatabaseConfiguration] =
dbAlias => read(config, List("db", dbAlias))
def factory(config: Config): DatabaseId => Validation[DatabaseConfiguration] =
dbAlias => read(config, NonEmptyList("db", List(dbAlias.code)))

@SuppressWarnings(Array("org.wartremover.warts.Any"))
def dbTransactor[F[_]: Effect: ContextShift](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package ch.chuv.lren.woken.config

import akka.http.scaladsl.model.Uri
import cats.data.NonEmptyList
import com.typesafe.config.Config
import ch.chuv.lren.woken.cromwell.core.ConfigUtil._
import ch.chuv.lren.woken.messages.datasets.{ AnonymisationLevel, Dataset, DatasetId }
import ch.chuv.lren.woken.messages.datasets.{ AnonymisationLevel, Dataset, DatasetId, TableId }
import ch.chuv.lren.woken.messages.remoting.{ BasicAuthentication, RemoteLocation }

import cats.data.Validated._
import cats.implicits._

Expand All @@ -32,7 +32,7 @@ object DatasetsConfiguration {
private[this] def createDataset(dataset: String,
label: String,
description: String,
tables: List[String],
tables: List[TableId],
anonymisationLevel: String,
location: Option[RemoteLocation]): Dataset = Dataset(
DatasetId(dataset),
Expand All @@ -43,16 +43,18 @@ object DatasetsConfiguration {
location
)

def read(config: Config, path: List[String]): Validation[Dataset] = {
def read(config: Config, path: List[String], database: DatabaseId): Validation[Dataset] = {

val datasetConfig = config.validateConfig(path.mkString("."))

datasetConfig.andThen { f =>
val dataset: Validation[String] =
path.lastOption.map(_.validNel[String]).getOrElse("Empty path".invalidNel[String])
val label = f.validateString("label")
val description = f.validateString("description")
val tables: Validation[List[String]] = f.validateStringList("tables")
val label = f.validateString("label")
val description = f.validateString("description")
val tables: Validation[List[TableId]] = f
.validateStringList("tables")
.map(l => l.map(t => TableId(database.code, t)))
val location: Validation[Option[RemoteLocation]] = f
.validateConfig("location")
.andThen { cl =>
Expand Down Expand Up @@ -87,25 +89,27 @@ object DatasetsConfiguration {

}

def factory(config: Config): DatasetId => Validation[Dataset] =
dataset => read(config, List("datasets", dataset.code))
def factory(config: Config, defaultDatabase: DatabaseId): DatasetId => Validation[Dataset] =
dataset => read(config, List("datasets", dataset.code), defaultDatabase)

def datasetNames(config: Config): Validation[Set[DatasetId]] =
config.validateConfig("datasets").map(_.keys.map(DatasetId))

def datasets(config: Config): Validation[Map[DatasetId, Dataset]] = {
val datasetFactory = factory(config)
datasetNames(config).andThen { names: Set[DatasetId] =>
val m: List[Validation[(DatasetId, Dataset)]] =
names.toList
.map { datasetId =>
val datasetIdV: Validation[DatasetId] = datasetId.validNel[String]
datasetIdV -> datasetFactory(datasetId)
}
.map(_.tupled)
val t: Validation[List[(DatasetId, Dataset)]] = m.sequence[Validation, (DatasetId, Dataset)]
t.map(_.toMap)
}
def datasets(config: Config, defaultDatabase: DatabaseId): Validation[Map[DatasetId, Dataset]] = {
val datasetFactory = factory(config, defaultDatabase)
datasetNames(config)
.andThen { names: Set[DatasetId] =>
val m: List[Validation[(DatasetId, Dataset)]] =
names.toList
.map { datasetId =>
val datasetIdV: Validation[DatasetId] = datasetId.validNel[String]
datasetIdV -> datasetFactory(datasetId)
}
.map(_.tupled)
val t: Validation[List[(DatasetId, Dataset)]] = m.sequence[Validation, (DatasetId, Dataset)]
t.map(_.toMap)
}
.ensure(NonEmptyList("No datasets are configured", Nil))(_.nonEmpty)
}

}
Loading

0 comments on commit 70240dc

Please sign in to comment.