diff --git a/build.sbt b/build.sbt index a155d5c3..b7346a4d 100644 --- a/build.sbt +++ b/build.sbt @@ -46,8 +46,8 @@ lazy val `woken` = library.kamonSigar, library.akkaHttpSwagger, library.swaggerScala, - library.swaggerJaxrs, library.swaggerUI, + library.javaxWsRs, library.sprayJson, library.slf4j, library.log4jSlf4j, @@ -104,6 +104,8 @@ lazy val library = val akkaManagement = "0.20.0" val akkaHttpSwagger = "2.0.1" val swaggerScala = "2.0.3" + val swaggerUI = "3.20.5" + val javaxWsRs = "2.0.1" val kamon = "1.1.5" val kamonAkka = "1.1.3" val kamonAkkaRemote = "1.1.0" @@ -113,11 +115,9 @@ lazy val library = val kamonSystemMetrics = "1.0.1" val kamonSigar = "1.6.6-rev002" val bugsnag = "3.4.1" - val swaggerJaxrs = "1.5.21" - val swaggerUI = "3.20.5" val sprayJson = "1.3.5" val slf4j = "1.7.25" - val log4j = "2.11.1" + val log4j = "2.11.2" val disruptor = "3.4.2" val scalaLogging = "3.9.0" val cats = "1.6.0" @@ -132,7 +132,7 @@ lazy val library = val dockerTestKit = "0.9.8" val diff = "2.0.1" val acyclic = "0.1.8" - val wokenMessages = "2.9.6" + val wokenMessages = "2.9.11" val sup = "0.2.0" val sttpBackend = "1.5.9" } @@ -163,6 +163,8 @@ lazy val library = val akkaManagementClusterHttp: ModuleID = "com.lightbend.akka.management" %% "akka-management-cluster-http" % Version.akkaManagement excludeAll ExclusionRules.excludeAkkaClusterSharding val akkaHttpSwagger: ModuleID = "com.github.swagger-akka-http" %% "swagger-akka-http" % Version.akkaHttpSwagger val swaggerScala: ModuleID = "com.github.swagger-akka-http" %% "swagger-scala-module" % Version.swaggerScala + val swaggerUI: ModuleID = "org.webjars" % "swagger-ui" % Version.swaggerUI + val javaxWsRs: ModuleID = "javax.ws.rs" % "javax.ws.rs-api" % Version.javaxWsRs // Kamon val kamon: ModuleID = "io.kamon" %% "kamon-core" % Version.kamon excludeAll ExclusionRules.excludeLogback @@ -175,8 +177,6 @@ lazy val library = val kamonSigar: ModuleID = "io.kamon" % "sigar-loader" % Version.kamonSigar val bugsnag: ModuleID = "com.bugsnag" % "bugsnag" % Version.bugsnag excludeAll ExclusionRules.excludeJackson - val swaggerJaxrs: ModuleID = "io.swagger" % "swagger-jaxrs" % Version.swaggerJaxrs - val swaggerUI: ModuleID = "org.webjars" % "swagger-ui" % Version.swaggerUI val sprayJson: ModuleID = "io.spray" %% "spray-json" % Version.sprayJson val slf4j: ModuleID = "org.slf4j" % "slf4j-api" % Version.slf4j val log4jSlf4j: ModuleID = "org.apache.logging.log4j" % "log4j-slf4j-impl" % Version.log4j diff --git a/src/main/scala/ch/chuv/lren/woken/akka/AkkaServer.scala b/src/main/scala/ch/chuv/lren/woken/akka/AkkaServer.scala index c2bd8aad..f2a8d2f4 100644 --- a/src/main/scala/ch/chuv/lren/woken/akka/AkkaServer.scala +++ b/src/main/scala/ch/chuv/lren/woken/akka/AkkaServer.scala @@ -26,7 +26,7 @@ import cats.effect._ import ch.chuv.lren.woken.backends.faas.chronos.ChronosExecutor import ch.chuv.lren.woken.backends.woken.WokenClientService import ch.chuv.lren.woken.backends.worker.WokenWorker -import ch.chuv.lren.woken.config.{ DatasetsConfiguration, WokenConfiguration } +import ch.chuv.lren.woken.config.WokenConfiguration import ch.chuv.lren.woken.errors.BugsnagErrorReporter import ch.chuv.lren.woken.service._ import com.typesafe.scalalogging.Logger @@ -64,7 +64,7 @@ class AkkaServer[F[_]: ConcurrentEffect: ContextShift: Timer]( val backendServices: BackendServices[F] = { val wokenService: WokenClientService = WokenClientService(config.jobs.node) val dispatcherService: DispatcherService = - DispatcherService(DatasetsConfiguration.datasets(config.config), wokenService) + DispatcherService(databaseServices.datasetService, wokenService) val wokenWorker = WokenWorker[F](pubSub, cluster) val algorithmExecutor = ChronosExecutor(system, chronosHttp, diff --git a/src/main/scala/ch/chuv/lren/woken/akka/MasterRouter.scala b/src/main/scala/ch/chuv/lren/woken/akka/MasterRouter.scala index f4c4f912..ccd3f454 100644 --- a/src/main/scala/ch/chuv/lren/woken/akka/MasterRouter.scala +++ b/src/main/scala/ch/chuv/lren/woken/akka/MasterRouter.scala @@ -27,7 +27,7 @@ import ch.chuv.lren.woken.dispatch.{ MiningQueriesActor } import ch.chuv.lren.woken.messages.{ Ping, Pong } -import ch.chuv.lren.woken.messages.datasets.{ DatasetsQuery, DatasetsResponse } +import ch.chuv.lren.woken.messages.datasets.{ DatasetsQuery, DatasetsResponse, TableId } import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.messages.variables._ import ch.chuv.lren.woken.service._ @@ -76,10 +76,12 @@ class MasterRouter[F[_]: Effect]( case ds: DatasetsQuery => mark("DatasetsQueryRequestReceived") - val allDatasets = databaseServices.datasetService.datasets() - val table = ds.table.getOrElse(config.jobs.featuresTable) + val allDatasets = databaseServices.datasetService.datasets().values.toSet + val table: TableId = ds.table + .map(t => TableId(config.jobs.defaultFeaturesDatabase.code, t)) + .getOrElse(config.jobs.defaultFeaturesTable) val datasets = - if (table == "*") allDatasets + if (table.name == "*") allDatasets else allDatasets.filter(_.tables.contains(table)) sender ! DatasetsResponse(datasets.map(_.withoutAuthenticationDetails)) diff --git a/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/ChronosExecutor.scala b/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/ChronosExecutor.scala index 00215d7d..006aad42 100644 --- a/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/ChronosExecutor.scala +++ b/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/ChronosExecutor.scala @@ -29,7 +29,7 @@ import scala.concurrent.{ ExecutionContext, Future } import ch.chuv.lren.woken.backends.HttpClient.checkHealth import ch.chuv.lren.woken.backends.faas.AlgorithmExecutor.TaggedS import ch.chuv.lren.woken.backends.faas.{ AlgorithmExecutor, AlgorithmResults } -import ch.chuv.lren.woken.config.{ DatabaseConfiguration, JobsConfiguration } +import ch.chuv.lren.woken.config.{ DatabaseConfiguration, DatabaseId, JobsConfiguration } import ch.chuv.lren.woken.core.model.jobs.{ DockerJob, ErrorJobResult } import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.core.fp._ @@ -48,7 +48,7 @@ case class ChronosExecutor[F[_]: Effect](system: ActorSystem, dockerBridgeNetwork: Option[String], jobResultService: JobResultRepository[F], jobsConf: JobsConfiguration, - jdbcConfF: String => Validation[DatabaseConfiguration]) + jdbcConfF: DatabaseId => Validation[DatabaseConfiguration]) extends AlgorithmExecutor[F] with LazyLogging { @@ -89,7 +89,7 @@ case class CoordinatorConfig[F[_]](chronosService: ActorRef, dockerBridgeNetwork: Option[String], jobResultService: JobResultRepository[F], jobsConf: JobsConfiguration, - jdbcConfF: String => Validation[DatabaseConfiguration]) + jdbcConfF: DatabaseId => Validation[DatabaseConfiguration]) /** * We use the companion object to hold all the messages that the ``CoordinatorActor`` diff --git a/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronos.scala b/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronos.scala index 1b2a7ed5..f2df5491 100644 --- a/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronos.scala +++ b/src/main/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronos.scala @@ -17,7 +17,7 @@ package ch.chuv.lren.woken.backends.faas.chronos -import ch.chuv.lren.woken.config.{ DatabaseConfiguration, JobsConfiguration } +import ch.chuv.lren.woken.config.{ DatabaseConfiguration, DatabaseId, JobsConfiguration } import ch.chuv.lren.woken.core.model.jobs.DockerJob import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import cats.implicits._ @@ -44,7 +44,7 @@ object JobToChronos { def apply(job: DockerJob, dockerBridgeNetwork: Option[String], jobsConf: JobsConfiguration, - jdbcConfF: String => Validation[DatabaseConfiguration]): Validation[ChronosJob] = { + jdbcConfF: DatabaseId => Validation[DatabaseConfiguration]): Validation[ChronosJob] = { val container = dockerBridgeNetwork.fold( Container(`type` = ContainerType.DOCKER, image = job.algorithmDefinition.dockerImage) @@ -85,7 +85,7 @@ object JobToChronos { ) } - val inputDb = jdbcConfF(job.query.dbTable.database) + val inputDb = jdbcConfF(DatabaseId(job.query.dbTable.database)) val outputDb = jdbcConfF(jobsConf.resultDb) (inputDb, outputDb) mapN buildChronosJob diff --git a/src/main/scala/ch/chuv/lren/woken/backends/woken/WokenClientService.scala b/src/main/scala/ch/chuv/lren/woken/backends/woken/WokenClientService.scala index 037b5e04..242e51fe 100644 --- a/src/main/scala/ch/chuv/lren/woken/backends/woken/WokenClientService.scala +++ b/src/main/scala/ch/chuv/lren/woken/backends/woken/WokenClientService.scala @@ -101,11 +101,11 @@ case class WokenClientService(node: String)(implicit val system: ActorSystem, Unmarshal(response) .to[QueryResult]) .mapN((_, _)) - .map(rq => (rq._1, rq._2.copy(query = Some(query)))) + .map(rq => (rq._1, rq._2.copy(query = query))) case (url, query, failure) => failure.discardEntityBytes() (url, - QueryResult(None, + QueryResult("", node, Set(), Nil, @@ -114,7 +114,7 @@ case class WokenClientService(node: String)(implicit val system: ActorSystem, Some("dispatch"), None, Some(failure.entity.toString), - Some(query))).pure[Future] + query)).pure[Future] } .map(identity) diff --git a/src/main/scala/ch/chuv/lren/woken/config/DatabaseConfiguration.scala b/src/main/scala/ch/chuv/lren/woken/config/DatabaseConfiguration.scala index 0ff172a2..40ce3c52 100644 --- a/src/main/scala/ch/chuv/lren/woken/config/DatabaseConfiguration.scala +++ b/src/main/scala/ch/chuv/lren/woken/config/DatabaseConfiguration.scala @@ -17,6 +17,7 @@ package ch.chuv.lren.woken.config +import cats.data.NonEmptyList import doobie._ import doobie.implicits._ import doobie.hikari._ @@ -24,13 +25,16 @@ import cats.implicits._ import cats.effect._ import cats.data.Validated._ import com.typesafe.config.Config -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId } +import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn } import ch.chuv.lren.woken.cromwell.core.ConfigUtil._ +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.UserId import ch.chuv.lren.woken.messages.variables.SqlType import scala.language.higherKinds +final case class DatabaseId(code: String) + /** * Connection configuration for a database * @@ -45,7 +49,8 @@ import scala.language.higherKinds * @param user Database user * @param password Database password */ -final case class DatabaseConfiguration(dbiDriver: String, +final case class DatabaseConfiguration(id: DatabaseId, + dbiDriver: String, dbApiDriver: String, jdbcDriver: String, jdbcUrl: String, @@ -56,13 +61,14 @@ final case class DatabaseConfiguration(dbiDriver: String, user: String, password: String, poolSize: Int, - tables: Set[FeaturesTableDescription]) + tables: Map[TableId, FeaturesTableDescription]) object DatabaseConfiguration { - def read(config: Config, path: List[String]): Validation[DatabaseConfiguration] = { + def read(config: Config, path: NonEmptyList[String]): Validation[DatabaseConfiguration] = { - val dbConfig = config.validateConfig(path.mkString(".")) + val dbConfig = config.validateConfig(path.mkString_(".")) + val dbId = path.last.validNel[String].map(DatabaseId) dbConfig.andThen { db => val dbiDriver: Validation[String] = @@ -88,18 +94,21 @@ object DatabaseConfiguration { val tableFactory: String => Validation[FeaturesTableDescription] = table => - liftOption(path.lastOption).andThen { tableName => - readTable(db, List("tables", table), tableName) + dbId.andThen { id => + readTable(db, List("tables", table), id.code) } - val tables: Validation[Set[FeaturesTableDescription]] = { + val tables: Validation[Map[TableId, FeaturesTableDescription]] = { tableNames.andThen { names: Set[String] => val m: Set[Validation[FeaturesTableDescription]] = names.map(tableFactory) - m.toList.sequence[Validation, FeaturesTableDescription].map(_.toSet) + m.toList + .sequence[Validation, FeaturesTableDescription] + .map(l => l.map(d => (d.table, d)).toMap) } } - (dbiDriver, + (dbId, + dbiDriver, dbApiDriver, jdbcDriver, jdbcUrl, @@ -149,7 +158,8 @@ object DatabaseConfiguration { } .orElse(None.validNel) - val schema: Validation[Option[String]] = table.validateOptionalString("schema") + val schema: Validation[String] = + table.validateOptionalString("schema").map(_.getOrElse("public")) val validateSchema: Validation[Boolean] = table.validateBoolean("validateSchema").orElse(true.validNel[String]) val seed: Validation[Double] = table.validateDouble("seed").orElse(0.67.validNel) @@ -160,8 +170,8 @@ object DatabaseConfiguration { } } - def factory(config: Config): String => Validation[DatabaseConfiguration] = - dbAlias => read(config, List("db", dbAlias)) + def factory(config: Config): DatabaseId => Validation[DatabaseConfiguration] = + dbAlias => read(config, NonEmptyList("db", List(dbAlias.code))) @SuppressWarnings(Array("org.wartremover.warts.Any")) def dbTransactor[F[_]: Effect: ContextShift]( diff --git a/src/main/scala/ch/chuv/lren/woken/config/DatasetsConfiguration.scala b/src/main/scala/ch/chuv/lren/woken/config/DatasetsConfiguration.scala index 03d73160..03d1820c 100644 --- a/src/main/scala/ch/chuv/lren/woken/config/DatasetsConfiguration.scala +++ b/src/main/scala/ch/chuv/lren/woken/config/DatasetsConfiguration.scala @@ -18,11 +18,11 @@ package ch.chuv.lren.woken.config import akka.http.scaladsl.model.Uri +import cats.data.NonEmptyList import com.typesafe.config.Config import ch.chuv.lren.woken.cromwell.core.ConfigUtil._ -import ch.chuv.lren.woken.messages.datasets.{ AnonymisationLevel, Dataset, DatasetId } +import ch.chuv.lren.woken.messages.datasets.{ AnonymisationLevel, Dataset, DatasetId, TableId } import ch.chuv.lren.woken.messages.remoting.{ BasicAuthentication, RemoteLocation } - import cats.data.Validated._ import cats.implicits._ @@ -32,7 +32,7 @@ object DatasetsConfiguration { private[this] def createDataset(dataset: String, label: String, description: String, - tables: List[String], + tables: List[TableId], anonymisationLevel: String, location: Option[RemoteLocation]): Dataset = Dataset( DatasetId(dataset), @@ -43,16 +43,18 @@ object DatasetsConfiguration { location ) - def read(config: Config, path: List[String]): Validation[Dataset] = { + def read(config: Config, path: List[String], database: DatabaseId): Validation[Dataset] = { val datasetConfig = config.validateConfig(path.mkString(".")) datasetConfig.andThen { f => val dataset: Validation[String] = path.lastOption.map(_.validNel[String]).getOrElse("Empty path".invalidNel[String]) - val label = f.validateString("label") - val description = f.validateString("description") - val tables: Validation[List[String]] = f.validateStringList("tables") + val label = f.validateString("label") + val description = f.validateString("description") + val tables: Validation[List[TableId]] = f + .validateStringList("tables") + .map(l => l.map(t => TableId(database.code, t))) val location: Validation[Option[RemoteLocation]] = f .validateConfig("location") .andThen { cl => @@ -87,25 +89,27 @@ object DatasetsConfiguration { } - def factory(config: Config): DatasetId => Validation[Dataset] = - dataset => read(config, List("datasets", dataset.code)) + def factory(config: Config, defaultDatabase: DatabaseId): DatasetId => Validation[Dataset] = + dataset => read(config, List("datasets", dataset.code), defaultDatabase) def datasetNames(config: Config): Validation[Set[DatasetId]] = config.validateConfig("datasets").map(_.keys.map(DatasetId)) - def datasets(config: Config): Validation[Map[DatasetId, Dataset]] = { - val datasetFactory = factory(config) - datasetNames(config).andThen { names: Set[DatasetId] => - val m: List[Validation[(DatasetId, Dataset)]] = - names.toList - .map { datasetId => - val datasetIdV: Validation[DatasetId] = datasetId.validNel[String] - datasetIdV -> datasetFactory(datasetId) - } - .map(_.tupled) - val t: Validation[List[(DatasetId, Dataset)]] = m.sequence[Validation, (DatasetId, Dataset)] - t.map(_.toMap) - } + def datasets(config: Config, defaultDatabase: DatabaseId): Validation[Map[DatasetId, Dataset]] = { + val datasetFactory = factory(config, defaultDatabase) + datasetNames(config) + .andThen { names: Set[DatasetId] => + val m: List[Validation[(DatasetId, Dataset)]] = + names.toList + .map { datasetId => + val datasetIdV: Validation[DatasetId] = datasetId.validNel[String] + datasetIdV -> datasetFactory(datasetId) + } + .map(_.tupled) + val t: Validation[List[(DatasetId, Dataset)]] = m.sequence[Validation, (DatasetId, Dataset)] + t.map(_.toMap) + } + .ensure(NonEmptyList("No datasets are configured", Nil))(_.nonEmpty) } } diff --git a/src/main/scala/ch/chuv/lren/woken/config/JobsConfiguration.scala b/src/main/scala/ch/chuv/lren/woken/config/JobsConfiguration.scala index 61b6d4cf..e0aaf66b 100644 --- a/src/main/scala/ch/chuv/lren/woken/config/JobsConfiguration.scala +++ b/src/main/scala/ch/chuv/lren/woken/config/JobsConfiguration.scala @@ -21,6 +21,7 @@ import com.typesafe.config.Config import ch.chuv.lren.woken.cromwell.core.ConfigUtil._ import cats.data.Validated._ import cats.implicits._ +import ch.chuv.lren.woken.messages.datasets.TableId /** * Configuration for the jobs executing algorithms on this node @@ -28,7 +29,8 @@ import cats.implicits._ * @param node Name of the current node used for computations * @param owner Owner of the job, can be an email address * @param chronosServerUrl URL to Chronos server used to launch Docker containers in the Mesos cluster - * @param featuresDb Configuration alias of the database containing features + * @param defaultFeaturesDatabase Default database to use when looking for a table containing features + * @param defaultFeaturesTable Pointer to the table containing features * @param resultDb Configuration alias of the database used to store results * @param defaultJobMemory Default memory in Mb allocated to a job * @param defaultJobCpus Default share of CPUs allocated to a job @@ -37,11 +39,10 @@ final case class JobsConfiguration( node: String, owner: String, chronosServerUrl: String, - featuresDb: String, - featuresTable: String, - metadataKeyForFeaturesTable: String, - resultDb: String, - metaDb: String, + defaultFeaturesDatabase: DatabaseId, + defaultFeaturesTable: TableId, + resultDb: DatabaseId, + metaDb: DatabaseId, defaultJobCpus: Double, defaultJobMemory: Int ) @@ -53,15 +54,17 @@ object JobsConfiguration { val jobsConfig = config.validateConfig(path.mkString(".")) jobsConfig.andThen { jobs => - val node = jobs.validateString("node") - val owner = jobs.validateString("owner") - val chronosServerUrl = jobs.validateString("chronosServerUrl") - val featuresDb = jobs.validateString("featuresDb") - val featuresTable = jobs.validateString("featuresTable") - val metadataKeyForFeaturesTable: Validation[String] = - jobs.validateString("metadataKeyForFeaturesTable").orElse(featuresTable) - val resultDb = jobs.validateString("resultDb") - val metaDb = jobs.validateString("metaDb") + val node = jobs.validateString("node") + val owner = jobs.validateString("owner") + val chronosServerUrl = jobs.validateString("chronosServerUrl") + val defaultFeaturesDatabase = jobs.validateString("defaultFeaturesDatabase").map(DatabaseId) + val defaultFeaturesTable: Validation[TableId] = jobs + .validateConfig("defaultFeaturesTable") + .andThen( + tableConfig => defaultFeaturesDatabase.andThen(db => tableIdConfig(db, tableConfig)) + ) + val resultDb = jobs.validateString("resultDb").map(DatabaseId) + val metaDb = jobs.validateString("metaDb").map(DatabaseId) val cpus: Validation[Double] = jobs.validateDouble("defaultJobCpus").orElse(0.5.validNel[String]) val mem: Validation[Int] = jobs.validateInt("defaultJobMemory").orElse(512.validNel[String]) @@ -69,9 +72,8 @@ object JobsConfiguration { (node, owner, chronosServerUrl, - featuresDb, - featuresTable, - metadataKeyForFeaturesTable, + defaultFeaturesDatabase, + defaultFeaturesTable, resultDb, metaDb, cpus, @@ -79,4 +81,13 @@ object JobsConfiguration { } } + private def tableIdConfig(defaultFeatureDatabase: DatabaseId, + tableConfig: Config): Validation[TableId] = { + val name = tableConfig.validateString("name") + val schema = tableConfig.validateOptionalString("schema").map(_.getOrElse("public")) + val database = + tableConfig.validateOptionalString("database").map(_.getOrElse(defaultFeatureDatabase.code)) + + (database, schema, name) mapN TableId.apply + } } diff --git a/src/main/scala/ch/chuv/lren/woken/config/WokenConfiguration.scala b/src/main/scala/ch/chuv/lren/woken/config/WokenConfiguration.scala index f8d38c28..a7a5fc15 100644 --- a/src/main/scala/ch/chuv/lren/woken/config/WokenConfiguration.scala +++ b/src/main/scala/ch/chuv/lren/woken/config/WokenConfiguration.scala @@ -32,17 +32,17 @@ case class WokenConfiguration(config: Config) { .read(config) .valueOr(configurationFailed) + val databaseConfig: DatabaseId => Validation[DatabaseConfiguration] = + DatabaseConfiguration.factory(config) + val jobs: JobsConfiguration = JobsConfiguration .read(config) .valueOr(configurationFailed) - val databaseConfig: String => Validation[DatabaseConfiguration] = - DatabaseConfiguration.factory(config) - - val featuresDb: DatabaseConfiguration = databaseConfig(jobs.featuresDb) + val featuresDb: DatabaseConfiguration = databaseConfig(jobs.defaultFeaturesDatabase) .valueOr(configurationFailed) - val resultsDb: DatabaseConfiguration = databaseConfig("woken") + val resultsDb: DatabaseConfiguration = databaseConfig(DatabaseId("woken")) .valueOr(configurationFailed) val metaDb: DatabaseConfiguration = DatabaseConfiguration diff --git a/src/main/scala/ch/chuv/lren/woken/core/features/FeaturesQuery.scala b/src/main/scala/ch/chuv/lren/woken/core/features/FeaturesQuery.scala index edfb5cac..ef4eb313 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/features/FeaturesQuery.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/features/FeaturesQuery.scala @@ -17,7 +17,8 @@ package ch.chuv.lren.woken.core.features -import ch.chuv.lren.woken.core.model.database.{ TableColumn, TableId } +import ch.chuv.lren.woken.core.model.database.TableColumn +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.filters._ import ch.chuv.lren.woken.messages.query.filters.FilterRule._ import ch.chuv.lren.woken.messages.variables.SqlType diff --git a/src/main/scala/ch/chuv/lren/woken/core/features/Queries.scala b/src/main/scala/ch/chuv/lren/woken/core/features/Queries.scala index 9bab5701..479e936f 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/features/Queries.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/features/Queries.scala @@ -17,8 +17,8 @@ package ch.chuv.lren.woken.core.features -import ch.chuv.lren.woken.core.model.database.{ TableColumn, TableId } -import ch.chuv.lren.woken.messages.datasets.DatasetId +import ch.chuv.lren.woken.core.model.database.TableColumn +import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId } import ch.chuv.lren.woken.messages.query.{ ExperimentQuery, MiningQuery, Query } import ch.chuv.lren.woken.messages.query.filters._ import ch.chuv.lren.woken.messages.variables.{ FeatureIdentifier, VariableId } @@ -114,8 +114,7 @@ object Queries { } def features(defaultInputTable: TableId, orderBy: Option[String]): FeaturesQuery = { - val inputTable = - query.targetTable.fold(defaultInputTable)(t => defaultInputTable.copy(name = t)) + val inputTable = query.targetTable.getOrElse(defaultInputTable) FeaturesQuery(dbVariables, dbCovariables, diff --git a/src/main/scala/ch/chuv/lren/woken/core/model/VariablesMeta.scala b/src/main/scala/ch/chuv/lren/woken/core/model/VariablesMeta.scala index ac61f14b..15be1275 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/model/VariablesMeta.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/model/VariablesMeta.scala @@ -20,6 +20,7 @@ package ch.chuv.lren.woken.core.model import ch.chuv.lren.woken.messages.variables.{ GroupMetaData, VariableId, VariableMetaData } import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import cats.syntax.validated._ +import ch.chuv.lren.woken.messages.datasets.TableId // TODO: source field in VariablesMeta has unclear semantics and purpose. @@ -35,7 +36,7 @@ import cats.syntax.validated._ case class VariablesMeta(id: Int, source: String, hierarchy: GroupMetaData, - targetFeaturesTable: String, + targetFeaturesTable: TableId, defaultHistogramGroupings: List[VariableId]) { /** diff --git a/src/main/scala/ch/chuv/lren/woken/core/model/database/FeaturesTableDescription.scala b/src/main/scala/ch/chuv/lren/woken/core/model/database/FeaturesTableDescription.scala index 80d45ff4..2de25af0 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/model/database/FeaturesTableDescription.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/model/database/FeaturesTableDescription.scala @@ -17,6 +17,7 @@ package ch.chuv.lren.woken.core.model.database +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.UserId import ch.chuv.lren.woken.messages.variables.SqlType.SqlType @@ -63,6 +64,7 @@ case class FeaturesTableDescription( * @return the quoted name of the table, for use in SELECT statements */ def quotedName: String = - table.dbSchema.fold(s""""${table.name}"""")(sch => s""""$sch"."${table.name}"""") + if (table.dbSchema == "public") s""""${table.name}"""" + else s""""${table.dbSchema}"."${table.name}"""" } diff --git a/src/main/scala/ch/chuv/lren/woken/core/model/database/TableId.scala b/src/main/scala/ch/chuv/lren/woken/core/model/database/TableId.scala deleted file mode 100644 index 01779f56..00000000 --- a/src/main/scala/ch/chuv/lren/woken/core/model/database/TableId.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2017 LREN CHUV for Human Brain Project - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package ch.chuv.lren.woken.core.model.database - -/** Identifier for a table - * - * @param database The database owning the table - * @param dbSchema The schema containing the table, or None for default 'public' schema - * @param name The name of the table, without any quotation - */ -case class TableId(database: String, dbSchema: Option[String], name: String) { - - assert(!name.contains("."), "Table name should not contain schema information") - - def schemaOrPublic: String = dbSchema.getOrElse("public") - - def same(other: TableId): Boolean = - (other.dbSchema == dbSchema || dbSchema.isEmpty && other.dbSchema - .contains("public")) && (other.name == name) - -} - -object TableId { - - @SuppressWarnings(Array("org.wartremover.warts.Throw")) - def apply(database: String, qualifiedTableName: String): TableId = - qualifiedTableName.split("\\.").toList match { - case tableName :: Nil => TableId(database, None, tableName) - case dbSchema :: tableName :: Nil => TableId(database, Some(dbSchema), tableName) - case db :: dbSchema :: tableName :: Nil => TableId(db, Some(dbSchema), tableName) - case _ => throw new IllegalArgumentException(s"Invalid table name: $qualifiedTableName") - } -} diff --git a/src/main/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJob.scala b/src/main/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJob.scala index 2d052e37..332ab181 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJob.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJob.scala @@ -20,8 +20,9 @@ package ch.chuv.lren.woken.core.model.jobs import cats.implicits._ import ch.chuv.lren.woken.core.features.Queries._ import ch.chuv.lren.woken.core.model.AlgorithmDefinition -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableId } +import ch.chuv.lren.woken.core.model.database.FeaturesTableDescription import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.{ AlgorithmSpec, ExperimentQuery } import ch.chuv.lren.woken.messages.variables.VariableMetaData @@ -75,7 +76,7 @@ object ExperimentJob { val experimentQuery = query .filterDatasets(inputTable.datasetColumn) .filterNulls(variablesCanBeNull, covariablesCanBeNull) - .copy(targetTable = Some(inputTable.table.name)) + .copy(targetTable = Some(inputTable.table)) // TODO: report to user filtering on nulls when activated // TODO: report to user which algorithms are causing filter on nulls, and how many records are lost from training dataset diff --git a/src/main/scala/ch/chuv/lren/woken/core/model/jobs/JobResult.scala b/src/main/scala/ch/chuv/lren/woken/core/model/jobs/JobResult.scala index a6d9a239..145df855 100644 --- a/src/main/scala/ch/chuv/lren/woken/core/model/jobs/JobResult.scala +++ b/src/main/scala/ch/chuv/lren/woken/core/model/jobs/JobResult.scala @@ -29,6 +29,8 @@ import spray.json._ import APIJsonProtocol._ import ch.chuv.lren.woken.messages.datasets.DatasetId +import scala.collection.immutable.TreeSet + /** * Result produced during the execution of an algorithm */ @@ -161,9 +163,20 @@ case class ExperimentJobResult(jobId: String, import queryProtocol._ // Concatenate results + val system = UserId("system") JsArray( results.map { r => - val obj = r._2.asQueryResult(None, Set(), List()).toJson.asJsObject + val dummyQuery = MiningQuery(system, + Nil, + Nil, + covariablesMustExist = false, + Nil, + None, + None, + TreeSet(), + r._1, + None) + val obj = r._2.asQueryResult(dummyQuery, Set(), List()).toJson.asJsObject JsObject(obj.fields + ("algorithmSpec" -> r._1.toJson)) }.toVector ) @@ -281,13 +294,13 @@ case class SerializedModelJobResult(jobId: String, object JobResult { def asQueryResult(jobResult: JobResult, - query: Option[Query], + query: Query, dataProvenance: DataProvenance, feedback: UserFeedbacks): QueryResult = jobResult match { case pfa: PfaJobResult => QueryResult( - jobId = Some(pfa.jobId), + jobId = pfa.jobId, node = pfa.node, dataProvenance = dataProvenance, feedback = feedback, @@ -300,7 +313,7 @@ object JobResult { ) case pfa: ExperimentJobResult => QueryResult( - jobId = Some(pfa.jobId), + jobId = pfa.jobId, node = pfa.node, dataProvenance = dataProvenance, feedback = feedback, @@ -313,7 +326,7 @@ object JobResult { ) case v: JsonDataJobResult => QueryResult( - jobId = Some(v.jobId), + jobId = v.jobId, node = v.node, dataProvenance = dataProvenance, feedback = feedback, @@ -326,7 +339,7 @@ object JobResult { ) case v: OtherDataJobResult => QueryResult( - jobId = Some(v.jobId), + jobId = v.jobId, node = v.node, dataProvenance = dataProvenance, feedback = feedback, @@ -339,7 +352,7 @@ object JobResult { ) case v: SerializedModelJobResult => QueryResult( - jobId = Some(v.jobId), + jobId = v.jobId, node = v.node, dataProvenance = dataProvenance, feedback = feedback, @@ -352,7 +365,7 @@ object JobResult { ) case e: ErrorJobResult => QueryResult( - jobId = e.jobId, + jobId = e.jobId.getOrElse("?"), node = e.node, dataProvenance = dataProvenance, feedback = feedback, @@ -366,7 +379,7 @@ object JobResult { } implicit class ToQueryResult(val jobResult: JobResult) extends AnyVal { - def asQueryResult(query: Option[Query], + def asQueryResult(query: Query, dataProvenance: Set[DatasetId], feedback: List[UserFeedback]): QueryResult = JobResult.asQueryResult(jobResult, query, dataProvenance, feedback) @@ -384,7 +397,7 @@ object JobResult { .getOrElse(Map()) PfaJobResult( - jobId = queryResult.jobId.getOrElse(""), + jobId = queryResult.jobId, node = queryResult.node, timestamp = queryResult.timestamp, algorithm = queryResult.algorithm.getOrElse(""), @@ -398,14 +411,14 @@ object JobResult { .getOrElse(Map()) ExperimentJobResult( - jobId = queryResult.jobId.getOrElse(""), + jobId = queryResult.jobId, node = queryResult.node, timestamp = queryResult.timestamp, results = results ) case Shapes.error => ErrorJobResult( - jobId = queryResult.jobId, + jobId = Some(queryResult.jobId), node = queryResult.node, timestamp = queryResult.timestamp, algorithm = queryResult.algorithm, @@ -414,7 +427,7 @@ object JobResult { case shape if Shapes.visualisationJsonResults.contains(shape) => JsonDataJobResult( - jobId = queryResult.jobId.getOrElse(""), + jobId = queryResult.jobId, node = queryResult.node, timestamp = queryResult.timestamp, algorithm = queryResult.algorithm.getOrElse(""), @@ -424,7 +437,7 @@ object JobResult { case shape if Shapes.visualisationOtherResults.contains(shape) => OtherDataJobResult( - jobId = queryResult.jobId.getOrElse(""), + jobId = queryResult.jobId, node = queryResult.node, timestamp = queryResult.timestamp, algorithm = queryResult.algorithm.getOrElse(""), diff --git a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala index e75eb1ba..e4ff6be8 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala @@ -21,12 +21,13 @@ import cats.Id import cats.effect.concurrent.Ref import cats.effect.{ Effect, Resource } import cats.implicits._ +import ch.chuv.lren.woken.config.DatabaseConfiguration import ch.chuv.lren.woken.core.features.FeaturesQuery import ch.chuv.lren.woken.core.fp.runNow -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId } +import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn } import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers -import ch.chuv.lren.woken.messages.datasets.DatasetId +import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId } import spray.json._ import spray.json.DefaultJsonProtocol._ import ch.chuv.lren.woken.messages.query.filters._ @@ -47,14 +48,9 @@ import scala.util.{ Failure, Success, Try } trait FeaturesRepository[F[_]] extends Repository[F] { /** - * @return the name of the database as defined in the configuration + * @return the database as defined in the configuration */ - def database: String - - /** - * @return the list of features tables available in the database - */ - def tables: Set[FeaturesTableDescription] + def database: DatabaseConfiguration /** * Provides the interface to a features table @@ -160,8 +156,7 @@ object FeaturesTableRepository { } class FeaturesInMemoryRepository[F[_]: Effect]( - override val database: String, - override val tables: Set[FeaturesTableDescription], + override val database: DatabaseConfiguration, val tablesContent: Map[TableId, (Headers, List[JsObject])] ) extends FeaturesRepository[F] { @@ -169,11 +164,11 @@ class FeaturesInMemoryRepository[F[_]: Effect]( override def featuresTable(table: TableId): F[Option[FeaturesTableRepository[F]]] = { cache.get(table).orElse { - tables.find(_.table == table).map { t => + database.tables.get(table).map { t => val (headers, data) = tablesContent.getOrElse(table, Nil -> Nil) cache.getOrElseUpdate( table, - new FeaturesTableInMemoryRepository[F](table, headers, t.datasetColumn, data) + new FeaturesTableInMemoryRepository[F](t, headers, t.datasetColumn, data) ) } } @@ -183,7 +178,7 @@ class FeaturesInMemoryRepository[F[_]: Effect]( } -class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId, +class FeaturesTableInMemoryRepository[F[_]: Effect](override val table: FeaturesTableDescription, override val columns: List[TableColumn], val datasetColumn: Option[TableColumn], val dataFeatures: List[JsObject]) @@ -192,13 +187,10 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId, import FeaturesTableRepository.Headers import spray.json._ - override val table = - FeaturesTableDescription(tableId, Nil, datasetColumn, validateSchema = false, None, 0.0) - override def count: F[Int] = dataFeatures.size.pure[F] override def count(dataset: DatasetId): F[Int] = - datasetColumn.fold(if (dataset.code == tableId.name) count else 0.pure[F])( + datasetColumn.fold(if (dataset.code == table.table.name) count else 0.pure[F])( ds => dataFeatures .count( @@ -259,7 +251,7 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId, override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] = datasetColumn.fold( - count(filters).map(n => if (n == 0) Set[DatasetId]() else Set(DatasetId(tableId.name))) + count(filters).map(n => if (n == 0) Set[DatasetId]() else Set(DatasetId(table.table.name))) ) { _ => val datasets: Set[DatasetId] = Set() datasets.pure[F] @@ -305,7 +297,11 @@ object ExtendedFeaturesTableInMemoryRepository { nextTableSeqNumber: () => F[Int] ): Validation[Resource[F, ExtendedFeaturesTableInMemoryRepository[F]]] = { val pk = sourceTable.table.primaryKey.headOption - .getOrElse(throw new Exception("Expected a single primary key")) + .getOrElse( + throw new Exception( + s"Expected a primary key with one column in table ${sourceTable.table.table.toString}" + ) + ) val rndColumn = TableColumn("_rnd", SqlType.numeric) val newColumns = newFeatures ++ otherColumns val extTableColumns = newFeatures ++ List(rndColumn) diff --git a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala index 382b9d06..513f6d76 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala @@ -24,11 +24,12 @@ import spray.json._ import cats.data.{ NonEmptyList, Validated } import cats.effect.{ Effect, Resource } import cats.implicits._ +import ch.chuv.lren.woken.config.DatabaseConfiguration import ch.chuv.lren.woken.core.features.FeaturesQuery -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId } +import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn } import ch.chuv.lren.woken.core.model.database.sqlUtils._ import ch.chuv.lren.woken.core.fp.runNow -import ch.chuv.lren.woken.messages.datasets.DatasetId +import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId } import ch.chuv.lren.woken.cromwell.core.ConfigUtil._ import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers import ch.chuv.lren.woken.messages.query.filters.FilterRule @@ -43,14 +44,13 @@ import scala.language.higherKinds class FeaturesRepositoryDAO[F[_]: Effect] private ( val xa: Transactor[F], - override val database: String, - override val tables: Set[FeaturesTableDescription], + override val database: DatabaseConfiguration, val wokenRepository: WokenRepository[F] ) extends FeaturesRepository[F] { override def featuresTable(table: TableId): F[Option[FeaturesTableRepository[F]]] = - tables - .find(_.table.same(table)) + database.tables + .get(table) .map( t => FeaturesTableRepositoryDAO[F](xa, t, wokenRepository) @@ -66,15 +66,14 @@ object FeaturesRepositoryDAO { def apply[F[_]: Effect]( xa: Transactor[F], - database: String, - tables: Set[FeaturesTableDescription], + database: DatabaseConfiguration, wokenRepository: WokenRepository[F] ): F[Validation[FeaturesRepositoryDAO[F]]] = { case class Check(schema: String, table: String, column: String) def columnCheck(table: FeaturesTableDescription, column: TableColumn): Check = - Check(table.table.schemaOrPublic, table.table.name, column.name) + Check(table.table.dbSchema, table.table.name, column.name) // TODO: add "and is_identity='YES'" to the check. Problem: our tables don't have their primary key properly defined def checkPrimaryKey(check: Check): Fragment = sql""" @@ -87,21 +86,23 @@ object FeaturesRepositoryDAO { WHERE table_schema=${check.schema} and table_name=${check.table} and column_name=${check.column}""" val empty: List[F[Option[String]]] = Nil - val checks: List[F[Option[String]]] = tables - .filter(_.validateSchema) + val checks: List[F[Option[String]]] = database.tables + .filter(_._2.validateSchema) .map { table => - val checkPk: List[F[Option[String]]] = table.primaryKey.map { pk => - checkPrimaryKey(columnCheck(table, pk)).query[Int].to[List].transact(xa).map { test => - if (test.nonEmpty) None - else - Some(s"Primary key ${pk.name} not found in table ${table.quotedName}") + val tableDescription = table._2 + val checkPk: List[F[Option[String]]] = tableDescription.primaryKey.map { pk => + checkPrimaryKey(columnCheck(tableDescription, pk)).query[Int].to[List].transact(xa).map { + test => + if (test.nonEmpty) None + else + Some(s"Primary key ${pk.name} not found in table ${table._2.quotedName}") } } val checkDataset: List[F[Option[String]]] = - table.datasetColumn.fold(empty) { datasetColumn => + tableDescription.datasetColumn.fold(empty) { datasetColumn => val c = - checkDatasetColumn(columnCheck(table, datasetColumn)) + checkDatasetColumn(columnCheck(tableDescription, datasetColumn)) .query[Int] .to[List] .transact(xa) @@ -109,7 +110,7 @@ object FeaturesRepositoryDAO { if (test.nonEmpty) None else Some( - s"Dataset column ${datasetColumn.name} not found in table ${table.quotedName}" + s"Dataset column ${datasetColumn.name} not found in table ${tableDescription.quotedName}" ) } List(c) @@ -123,7 +124,7 @@ object FeaturesRepositoryDAO { .map(_.flatten) errors.map { - case Nil => new FeaturesRepositoryDAO(xa, database, tables, wokenRepository).validNel[String] + case Nil => new FeaturesRepositoryDAO(xa, database, wokenRepository).validNel[String] case error :: moreErrors => Validated.Invalid(NonEmptyList(error, moreErrors)) } diff --git a/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepository.scala b/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepository.scala index 02e30705..b89351d4 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepository.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepository.scala @@ -22,6 +22,7 @@ import cats.implicits._ import sup.HealthCheck import ch.chuv.lren.woken.core.model.database.FeaturesTableDescription import ch.chuv.lren.woken.core.model.VariablesMeta +import ch.chuv.lren.woken.messages.datasets.TableId import scala.collection.concurrent.TrieMap import scala.language.higherKinds @@ -42,7 +43,7 @@ trait VariablesMetaRepository[F[_]] extends Repository[F] { def put(variablesMeta: VariablesMeta): F[VariablesMeta] - def get(targetFeaturesTable: String): F[Option[VariablesMeta]] + def get(targetFeaturesTable: TableId): F[Option[VariablesMeta]] } @@ -50,7 +51,7 @@ trait TablesCatalogRepository[F[_]] extends Repository[F] { def put(table: FeaturesTableDescription): F[FeaturesTableDescription] - def get(table: String): F[Option[FeaturesTableDescription]] + def get(table: TableId): F[Option[FeaturesTableDescription]] } @@ -58,29 +59,29 @@ class MetadataInMemoryRepository[F[_]: Applicative] extends MetadataRepository[F override val variablesMeta: VariablesMetaRepository[F] = new VariablesMetaRepository[F] { - private val cache = new TrieMap[String, VariablesMeta] + private val cache = new TrieMap[TableId, VariablesMeta] override def put(variablesMeta: VariablesMeta): F[VariablesMeta] = { - val _ = cache.put(variablesMeta.targetFeaturesTable.toUpperCase, variablesMeta) + val _ = cache.put(variablesMeta.targetFeaturesTable, variablesMeta) variablesMeta.pure[F] } - override def get(targetFeaturesTable: String): F[Option[VariablesMeta]] = - cache.get(targetFeaturesTable.toUpperCase).pure[F] + override def get(targetFeaturesTable: TableId): F[Option[VariablesMeta]] = + cache.get(targetFeaturesTable).pure[F] override def healthCheck: HealthCheck[F, Id] = HealthCheck.liftFBoolean(true.pure[F]) } override val tablesCatalog: TablesCatalogRepository[F] = new TablesCatalogRepository[F] { - private val cache = new TrieMap[String, FeaturesTableDescription] + private val cache = new TrieMap[TableId, FeaturesTableDescription] override def put(table: FeaturesTableDescription): F[FeaturesTableDescription] = { - val _ = cache.put(table.table.name, table) + val _ = cache.put(table.table, table) table.pure[F] } - override def get(table: String): F[Option[FeaturesTableDescription]] = + override def get(table: TableId): F[Option[FeaturesTableDescription]] = cache.get(table).pure[F] override def healthCheck: HealthCheck[F, Id] = HealthCheck.liftFBoolean(true.pure[F]) diff --git a/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAO.scala b/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAO.scala index f5dbb238..84c91af9 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAO.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAO.scala @@ -26,6 +26,7 @@ import ch.chuv.lren.woken.messages.variables.{ GroupMetaData, variablesProtocol import sup.HealthCheck import ch.chuv.lren.woken.core.model.database.FeaturesTableDescription import ch.chuv.lren.woken.core.model.VariablesMeta +import ch.chuv.lren.woken.messages.datasets.TableId import variablesProtocol._ import scala.collection.mutable @@ -37,10 +38,14 @@ class VariablesMetaRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) implicit val groupMetaDataMeta: Meta[GroupMetaData] = codecMeta[GroupMetaData] // TODO: use a real cache, for example ScalaCache + Caffeine - val variablesMetaCache: mutable.Map[String, VariablesMeta] = - new mutable.WeakHashMap[String, VariablesMeta]() - - override def put(v: VariablesMeta): F[VariablesMeta] = + val variablesMetaCache: mutable.Map[TableId, VariablesMeta] = + new mutable.WeakHashMap[TableId, VariablesMeta]() + + override def put(v: VariablesMeta): F[VariablesMeta] = { + // TODO: add database and schema to the values + val database = v.targetFeaturesTable.database + val schema = v.targetFeaturesTable.dbSchema + val table = v.targetFeaturesTable.name sql""" INSERT INTO meta_variables (source, hierarchy, @@ -48,7 +53,7 @@ class VariablesMetaRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) histogram_groupings) VALUES (${v.source}, ${v.hierarchy}, - ${v.targetFeaturesTable}, + $table, ${v.defaultHistogramGroupings}) """.update .withUniqueGeneratedKeys[VariablesMeta]("id", @@ -57,18 +62,23 @@ class VariablesMetaRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) "target_table", "histogram_groupings") .transact(xa) + } - override def get(targetFeaturesTable: String): F[Option[VariablesMeta]] = { - val table = targetFeaturesTable.toUpperCase - val v = variablesMetaCache.get(table) + override def get(targetFeaturesTable: TableId): F[Option[VariablesMeta]] = { + val database = targetFeaturesTable.database + val schema = targetFeaturesTable.dbSchema + val table = targetFeaturesTable.name + val v = variablesMetaCache.get(targetFeaturesTable) + // TODO: add database and schema to the where clause + // TODO: collapse database,schema,table into a TableId in the Doobie mappings v.fold( sql"SELECT id, source, hierarchy, target_table, histogram_groupings FROM meta_variables WHERE target_table=$table" .query[VariablesMeta] .option .transact(xa) .map { r: Option[VariablesMeta] => - r.foreach(variablesMetaCache.put(table, _)) + r.foreach(variablesMetaCache.put(targetFeaturesTable, _)) r } )(Option(_).pure[F]) @@ -83,7 +93,7 @@ class TablesCatalogRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) override def put(table: FeaturesTableDescription): F[FeaturesTableDescription] = ??? - override def get(table: String): F[Option[FeaturesTableDescription]] = ??? + override def get(table: TableId): F[Option[FeaturesTableDescription]] = ??? override def healthCheck: HealthCheck[F, Id] = validate(xa) } diff --git a/src/main/scala/ch/chuv/lren/woken/dao/WokenRepository.scala b/src/main/scala/ch/chuv/lren/woken/dao/WokenRepository.scala index 3a7dd09c..2994d565 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/WokenRepository.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/WokenRepository.scala @@ -23,6 +23,7 @@ import cats._ import cats.effect.Effect import cats.implicits._ import ch.chuv.lren.woken.core.model.jobs.JobResult +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.{ Query, QueryResult } import sup.HealthCheck @@ -64,7 +65,7 @@ trait ResultsCacheRepository[F[_]] extends Repository[F] { def put(result: QueryResult, query: Query): F[Unit] def get(node: String, - table: String, + table: TableId, tableContentsHash: Option[String], query: Query): F[Option[QueryResult]] @@ -120,7 +121,7 @@ class ResultsCacheInMemoryRepository[F[_]: Effect] extends ResultsCacheRepositor override def put(result: QueryResult, query: Query): F[Unit] = ().pure[F] override def get( node: String, - table: String, + table: TableId, tableContentsHash: Option[String], query: Query ): F[Option[QueryResult]] = Option.empty[QueryResult].pure[F] diff --git a/src/main/scala/ch/chuv/lren/woken/dao/WokenRepositoryDAO.scala b/src/main/scala/ch/chuv/lren/woken/dao/WokenRepositoryDAO.scala index e9b313f7..bd9ae248 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/WokenRepositoryDAO.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/WokenRepositoryDAO.scala @@ -41,6 +41,7 @@ import ch.chuv.lren.woken.core.model.jobs._ import ch.chuv.lren.woken.core.json._ import ch.chuv.lren.woken.core.json.yaml.Yaml import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation +import ch.chuv.lren.woken.messages.datasets.TableId import spray.json._ import sup.HealthCheck @@ -228,7 +229,8 @@ class ResultsCacheRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) val function = result.algorithm // Extract fields that require validation - val tableNameV: Validation[String] = query.targetTable.toValidNel[String]("Empty table name") + val tableNameV: Validation[String] = + query.targetTable.map(_.toString).toValidNel[String]("Empty table name") val queryJsonV: Validation[JsObject] = { if (query.variables.length + query.covariables.length + query.grouping.length > 30) "High dimension query".invalidNel[JsObject] @@ -268,7 +270,7 @@ class ResultsCacheRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) override def get( node: String, - table: String, + table: TableId, tableContentsHash: Option[String], userQuery: WokenQuery ): F[Option[QueryResult]] = { @@ -279,6 +281,7 @@ class ResultsCacheRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) "High dimension query".invalidNel[JsObject] else query.toJson.asJsObject.validNel[String] } + val tableName = table.toString queryJsonV.fold( err => { @@ -287,7 +290,7 @@ class ResultsCacheRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) }, queryJson => { val filterBase = - fr"""WHERE "node" = $node AND "table_name" = $table AND "query" = $queryJson""" + fr"""WHERE "node" = $node AND "table_name" = $tableName AND "query" = $queryJson""" val filter = tableContentsHash.fold(filterBase)( hash => filterBase ++ fr"""AND "table_content_hash" =""" ++ frConst(hash) @@ -300,14 +303,14 @@ class ResultsCacheRepositoryDAO[F[_]: Effect](val xa: Transactor[F]) resultOp => resultOp.fold { logger.info( - s"Could not find matching result in the cache for node $node, table $table, query ${queryJson.compactPrint}" + s"Could not find matching result in the cache for node $node, table $tableName, query ${queryJson.compactPrint}" ) Option.empty[QueryResult].pure[F] } { result => val updateTs : Fragment = fr"""UPDATE "results_cache" SET "last_used" = now()""" ++ filter // Restore user query, as result may come from another user - updateTs.update.run.transact(xa).map(_ => Some(result.copy(query = Some(userQuery)))) + updateTs.update.run.transact(xa).map(_ => Some(result.copy(query = userQuery))) } } } diff --git a/src/main/scala/ch/chuv/lren/woken/dispatch/ExperimentQueriesActor.scala b/src/main/scala/ch/chuv/lren/woken/dispatch/ExperimentQueriesActor.scala index c574b293..0a467054 100644 --- a/src/main/scala/ch/chuv/lren/woken/dispatch/ExperimentQueriesActor.scala +++ b/src/main/scala/ch/chuv/lren/woken/dispatch/ExperimentQueriesActor.scala @@ -175,7 +175,7 @@ class ExperimentQueriesActor[F[_]: Effect]( mapFlow(job.query, job.queryAlgorithms, prov, feedback) .mapAsync(parallelism = 1) { case List() => Future(noResult(job.query, Set(), feedback)) - case List(result) => Future(result.copy(query = Some(query))) + case List(result) => Future(result.copy(query = query)) case listOfResults => gatherAndReduce(query, listOfResults, None) } .log("Result of experiment") @@ -217,7 +217,7 @@ class ExperimentQueriesActor[F[_]: Effect]( mapFlow(mapQuery, job.queryAlgorithms, prov, feedback) .mapAsync(parallelism = 1) { case List() => Future(noResult(query, Set(), feedback)) - case List(result) => Future(result.copy(query = Some(query))) + case List(result) => Future(result.copy(query = query)) case mapResults => gatherAndReduce(query, mapResults, reduceQuery) } @@ -243,7 +243,7 @@ class ExperimentQueriesActor[F[_]: Effect]( .named("remote-validation-of-distributed-experiment") .via(remoteValidationFlow) .map[QueryResult] { r => - r.experimentResult.asQueryResult(Some(mapQuery), + r.experimentResult.asQueryResult(mapQuery, prov ++ r.dataProvenance, feedback ++ r.feedback) } @@ -289,7 +289,7 @@ class ExperimentQueriesActor[F[_]: Effect]( .named("remote-validation-of-local-experiment") .via(remoteValidationFlow) .map[QueryResult] { r => - r.experimentResult.asQueryResult(Some(r.query), r.dataProvenance, r.feedback) + r.experimentResult.asQueryResult(r.query, r.dataProvenance, r.feedback) } .log("Final result of experiment") .withAttributes(debugElements) diff --git a/src/main/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActor.scala b/src/main/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActor.scala index 646d4a9a..ef062a38 100644 --- a/src/main/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActor.scala +++ b/src/main/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActor.scala @@ -99,10 +99,11 @@ class MetadataQueriesActor[F[_]: Effect](dispatcherService: DispatcherService, val query = varsForDataset.query.copy( datasets = datasetService .datasets() - .map(_.dataset) + .keys .filter( varsForDataset.query.datasets.isEmpty || varsForDataset.query.datasets.contains(_) ) + .toSet ) val zero = (query, VariablesForDatasetsResponse(Set())) diff --git a/src/main/scala/ch/chuv/lren/woken/dispatch/MiningQueriesActor.scala b/src/main/scala/ch/chuv/lren/woken/dispatch/MiningQueriesActor.scala index 7ce5022e..655d1935 100644 --- a/src/main/scala/ch/chuv/lren/woken/dispatch/MiningQueriesActor.scala +++ b/src/main/scala/ch/chuv/lren/woken/dispatch/MiningQueriesActor.scala @@ -108,10 +108,10 @@ class MiningQueriesActor[F[_]: Effect]( case mine: Mine => val initiator = if (mine.replyTo == Actor.noSender) sender() else mine.replyTo // Define the target table if user did not specify it - val query = - if (mine.query.targetTable.isEmpty) - mine.query.copy(targetTable = Some(config.jobs.featuresTable)) - else mine.query + val query: MiningQuery = mine.query.targetTable + .fold(mine.query.copy(targetTable = Some(config.jobs.defaultFeaturesTable)))( + _ => mine.query + ) val jobValidatedF = queryToJobService.miningQuery2Job(query) val doIt: F[QueryResult] = jobValidatedF.flatMap { jv => jv.fold( @@ -203,7 +203,7 @@ class MiningQueriesActor[F[_]: Effect]( remoteMapFlow(query) .mapAsync(parallelism = 1) { case List() => Future(noResult(query, Set(), Nil)) - case List(result) => Future(result.copy(query = Some(query))) + case List(result) => Future(result.copy(query = query)) case listOfResults => gatherAndReduce(query, listOfResults, None) } .log("Result of mining on remote node") @@ -235,7 +235,7 @@ class MiningQueriesActor[F[_]: Effect]( remoteMapFlow(mapQuery) .mapAsync(parallelism = 5) { - case List(result) => Future(result.copy(query = Some(query))) + case List(result) => Future(result.copy(query = query)) case mapResults => gatherAndReduce(query, mapResults, reduceQuery) } .runWith(Sink.last) @@ -249,8 +249,7 @@ class MiningQueriesActor[F[_]: Effect]( job: MiningJob): F[QueryResult] = { val dockerJob = job.dockerJob val node = config.jobs.node - // TODO: add support for table schema - val table = dockerJob.query.dbTable.name + val table = dockerJob.query.dbTable // TODO: use table content hash // Check the cache first @@ -274,12 +273,12 @@ class MiningQueriesActor[F[_]: Effect]( // Containerised algorithms that can produce more than one result (e.g. PFA model + images) are ignored val results = r.results match { case List() => noResult(query, Set(), feedback) - case List(result) => result.asQueryResult(Some(query), prov, feedback) + case List(result) => result.asQueryResult(query, prov, feedback) case result :: _ => val msg = s"Discarded additional results returned by algorithm ${dockerJob.algorithmDefinition.code}" logger.warn(msg) - result.asQueryResult(Some(query), prov, feedback :+ UserWarning(msg)) + result.asQueryResult(query, prov, feedback :+ UserWarning(msg)) } results.error match { @@ -313,7 +312,7 @@ class MiningQueriesActor[F[_]: Effect]( .map { case (job: ValidationJob[F], Right(score)) => QueryResult( - Some(job.jobId), + job.jobId, config.jobs.node, prov, feedback, @@ -322,11 +321,11 @@ class MiningQueriesActor[F[_]: Effect]( Some(ValidationJob.algorithmCode), Some(score.toJson), None, - Some(query) + query ) case (job: ValidationJob[F], Left(error)) => QueryResult( - Some(job.jobId), + job.jobId, config.jobs.node, prov, feedback, @@ -335,7 +334,7 @@ class MiningQueriesActor[F[_]: Effect]( Some(ValidationJob.algorithmCode), None, Some(error), - Some(query) + query ) } .fromFutureWithGuarantee( @@ -348,7 +347,7 @@ class MiningQueriesActor[F[_]: Effect]( ) // No local datasets match the query, return an empty result QueryResult( - Some(job.jobId), + job.jobId, config.jobs.node, prov, feedback, @@ -357,7 +356,7 @@ class MiningQueriesActor[F[_]: Effect]( Some(ValidationJob.algorithmCode), None, None, - Some(query) + query ).pure[F] } } diff --git a/src/main/scala/ch/chuv/lren/woken/dispatch/QueriesActor.scala b/src/main/scala/ch/chuv/lren/woken/dispatch/QueriesActor.scala index f7ac9ab2..fbfa51d6 100644 --- a/src/main/scala/ch/chuv/lren/woken/dispatch/QueriesActor.scala +++ b/src/main/scala/ch/chuv/lren/woken/dispatch/QueriesActor.scala @@ -79,10 +79,9 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { // TODO: cannot support a case where the same algorithm is used, but with different execution plans .filter { r => logger.info( - s"Check that algorithms in query ${r.query - .map(algorithmsOfQuery)} are in the reduce query algorithms ${algorithms.map(_.toString).mkString(",")}" + s"Check that algorithms in query ${algorithmsOfQuery(r.query)} are in the reduce query algorithms ${algorithms.map(_.toString).mkString(",")}" ) - r.query.fold(false) { + r.query match { case q: MiningQuery => algorithms.exists(_.code == q.algorithm.code) case q: ExperimentQuery => q.algorithms.exists { qAlgorithm => @@ -137,10 +136,10 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { } .map { case Nil => noResult(initialQuery, Set(), List()) - case List(result) => result.copy(query = Some(initialQuery)) + case List(result) => result.copy(query = initialQuery) case results => QueryResult( - jobId = None, + jobId = results.map(_.jobId).mkString("+").take(128), node = config.jobs.node, dataProvenance = results.toSet[QueryResult].flatMap(_.dataProvenance), feedback = results.flatMap(_.feedback), @@ -149,7 +148,7 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { algorithm = None, data = Some(results.toJson), error = None, - query = Some(initialQuery) + query = initialQuery ) } @@ -159,7 +158,7 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { dataProvenance: DataProvenance, feedback: UserFeedbacks): QueryResult = ErrorJobResult(None, config.jobs.node, OffsetDateTime.now(), None, "No results") - .asQueryResult(Some(initialQuery), dataProvenance, feedback) + .asQueryResult(initialQuery, dataProvenance, feedback) private[dispatch] def reportResult(initiator: ActorRef)( queryResult: QueryResult, @@ -180,7 +179,7 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { feedback: UserFeedbacks): QueryResult = { logger.error(s"Cannot complete query because of ${e.getMessage}", e) ErrorJobResult(None, config.jobs.node, OffsetDateTime.now(), None, e.toString) - .asQueryResult(Some(initialQuery), dataProvenance, feedback) + .asQueryResult(initialQuery, dataProvenance, feedback) } private[dispatch] def errorMsgResult(initialQuery: Q, @@ -189,7 +188,7 @@ trait QueriesActor[Q <: Query, F[_]] extends Actor with LazyLogging { feedback: UserFeedbacks): QueryResult = { logger.error(s"Cannot complete query $initialQuery, cause $errorMessage") ErrorJobResult(None, config.jobs.node, OffsetDateTime.now(), None, errorMessage) - .asQueryResult(Some(initialQuery), dataProvenance, feedback) + .asQueryResult(initialQuery, dataProvenance, feedback) } private[dispatch] def algorithmsOfQuery(query: Query): List[AlgorithmSpec] = query match { diff --git a/src/main/scala/ch/chuv/lren/woken/mining/experiments.scala b/src/main/scala/ch/chuv/lren/woken/mining/experiments.scala index f066b398..b61837fb 100644 --- a/src/main/scala/ch/chuv/lren/woken/mining/experiments.scala +++ b/src/main/scala/ch/chuv/lren/woken/mining/experiments.scala @@ -58,7 +58,7 @@ object LocalExperimentService { def toQueryResult: QueryResult = result .fold(r => r, r => r) - .asQueryResult(Some(jobInProgress.job.query), + .asQueryResult(jobInProgress.job.query, jobInProgress.dataProvenance, jobInProgress.feedback) } @@ -254,7 +254,7 @@ case class LocalExperimentService[F[_]: Effect]( val extTable = extendedFeaturesTable.table.table val extendedJob = job.job.copy( inputTable = extTable, - query = job.job.query.copy(targetTable = Some(extTable.name)) + query = job.job.query.copy(targetTable = Some(extTable)) ) val extendedJobInProgress: ExperimentJobInProgress = job.copy(job = extendedJob) diff --git a/src/main/scala/ch/chuv/lren/woken/service/DatabaseServices.scala b/src/main/scala/ch/chuv/lren/woken/service/DatabaseServices.scala index 545bfa69..2977b641 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/DatabaseServices.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/DatabaseServices.scala @@ -22,7 +22,6 @@ import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ import ch.chuv.lren.woken.config.{ DatabaseConfiguration, WokenConfiguration, configurationFailed } -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.dao._ import com.typesafe.scalalogging.Logger import doobie.hikari.HikariTransactor @@ -56,24 +55,22 @@ case class DatabaseServices[F[_]: ConcurrentEffect: ContextShift: Timer]( } Monoid - .combineAll(datasetService.datasets().filter(_.location.isEmpty).map { dataset => + .combineAll(datasetService.datasets().values.filter(_.location.isEmpty).map { dataset => Monoid.combineAll(dataset.tables.map { - qualifiedTableName => + table => { - val table = TableId(config.jobs.featuresDb, qualifiedTableName) - featuresService .featuresTable(table) .fold[F[Unit]]( { error: NonEmptyList[String] => - val errMsg = error.mkString_("", ",", "") + val errMsg = error.mkString_(",") logger.error(errMsg) Effect[F].raiseError(new IllegalStateException(errMsg)) }, { table: FeaturesTableService[F] => - table.count(dataset.dataset).map { count => + table.count(dataset.id).map { count => if (count == 0) { val error = - s"Table ${table.table} contains no value for dataset ${dataset.dataset.code}" + s"Table ${table.table} contains no value for dataset ${dataset.id.code}" logger.error(error) throw new IllegalStateException(error) } @@ -142,10 +139,7 @@ object DatabaseServices { val fsIO: F[FeaturesService[F]] = wokenIO.flatMap { wokenRepository => mkService(t.featuresTransactor, config.featuresDb) { xa => - FeaturesRepositoryDAO(xa, - config.featuresDb.database, - config.featuresDb.tables, - wokenRepository).map { + FeaturesRepositoryDAO(xa, config.featuresDb, wokenRepository).map { _.map { FeaturesService.apply[F] } } }.map(_.valueOr(configurationFailed)) @@ -155,7 +149,7 @@ object DatabaseServices { Sync[F].delay(MetadataRepositoryDAO(xa).variablesMeta) } - val datasetService = ConfBasedDatasetService(config.config) + val datasetService = ConfBasedDatasetService(config.config, config.jobs) val algorithmLibraryService = AlgorithmLibraryService() val servicesIO = for { diff --git a/src/main/scala/ch/chuv/lren/woken/service/DatasetService.scala b/src/main/scala/ch/chuv/lren/woken/service/DatasetService.scala index 814aecb1..ec6c6c4f 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/DatasetService.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/DatasetService.scala @@ -17,19 +17,18 @@ package ch.chuv.lren.woken.service -import ch.chuv.lren.woken.config.DatasetsConfiguration -import ch.chuv.lren.woken.messages.datasets.Dataset +import ch.chuv.lren.woken.config.{ DatasetsConfiguration, JobsConfiguration } +import ch.chuv.lren.woken.messages.datasets.{ Dataset, DatasetId } import com.typesafe.config.Config trait DatasetService { - def datasets(): Set[Dataset] + def datasets(): Map[DatasetId, Dataset] } -case class ConfBasedDatasetService(config: Config) extends DatasetService { - override def datasets(): Set[Dataset] = +case class ConfBasedDatasetService(config: Config, jobsConfiguration: JobsConfiguration) + extends DatasetService { + override def datasets(): Map[DatasetId, Dataset] = DatasetsConfiguration - .datasets(config) + .datasets(config, jobsConfiguration.defaultFeaturesDatabase) .valueOr(nel => throw new IllegalStateException(s"Cannot load datasets: $nel")) - .values - .toSet } diff --git a/src/main/scala/ch/chuv/lren/woken/service/DispatcherService.scala b/src/main/scala/ch/chuv/lren/woken/service/DispatcherService.scala index c8bafb97..3a32fa2d 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/DispatcherService.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/DispatcherService.scala @@ -24,7 +24,6 @@ import cats.effect.Effect import cats.implicits._ import ch.chuv.lren.woken.core.fp._ import ch.chuv.lren.woken.messages.query.{ ExperimentQuery, MiningQuery, QueryResult } -import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import com.typesafe.scalalogging.{ LazyLogging, Logger } import ch.chuv.lren.woken.backends.woken.WokenClientService import ch.chuv.lren.woken.core.model.VariablesMeta @@ -63,30 +62,28 @@ trait DispatcherService { /** * Creates flows that dispatch queries to local or remote Woken workers according to the datasets * - * @param allDatasets All datasets known to this Woken instance + * @param datasetService The service providing datasets * @param wokenClientService The client service used to dispatch calls to other Woken instances * @author Ludovic Claude */ -class DispatcherServiceImpl(val allDatasets: Map[DatasetId, Dataset], +class DispatcherServiceImpl(val datasetService: DatasetService, val wokenClientService: WokenClientService) extends DispatcherService with LazyLogging { - override lazy val localDatasets: Set[DatasetId] = allDatasets.filter { - case (_, dataset) => dataset.location.isEmpty - }.keySet + override lazy val localDatasets: Set[DatasetId] = datasetService + .datasets() + .filter { + case (_, dataset) => dataset.location.isEmpty + } + .keySet override def dispatchTo(dataset: DatasetId): Option[RemoteLocation] = - if (allDatasets.isEmpty) - None - else - allDatasets.get(dataset).flatMap(_.location) + datasetService.datasets().get(dataset).flatMap(_.location) override def dispatchTo(datasets: Set[DatasetId]): (Set[RemoteLocation], Boolean) = { logger.whenDebugEnabled( - logger.debug( - s"Dispatch to datasets [${datasets.map(_.code).mkString(",")}] knowing [${allDatasets.keys.map(_.code).mkString(",")}]" - ) + logger.debug(s"Dispatch to datasets [${datasets.map(_.code).mkString(",")}]") ) val maybeLocations = datasets.map(dispatchTo) val local = maybeLocations.isEmpty || maybeLocations.contains(None) @@ -163,17 +160,17 @@ class DispatcherServiceImpl(val allDatasets: Map[DatasetId, Dataset], ): Flow[VariablesForDatasetsQuery, VariablesForDatasetsQR, NotUsed] = Flow[VariablesForDatasetsQuery] .map[VariablesForDatasetsQR] { q => - val datasets: Set[Dataset] = datasetService + val datasets: Iterable[Dataset] = datasetService .datasets() + .values .filter(_.location.isEmpty) - .filter(ds => q.datasets.isEmpty || q.datasets.contains(ds.dataset)) + .filter(ds => q.datasets.isEmpty || q.datasets.contains(ds.id)) val mergedVariables = datasets .map { ds => val varsForDs = ds.tables - .map(_.toUpperCase) // TODO: table name should not always be uppercase - .flatMap(v => runNow(variablesMetaService.get(v))) + .flatMap(tableId => runNow(variablesMetaService.get(tableId))) .flatMap(_.allVariables()) - .map(_.copy(datasets = Set(ds.dataset))) + .map(_.copy(datasets = Set(ds.id))) varsForDs.toSet } .foldLeft(Set[VariableMetaData]()) { @@ -190,16 +187,7 @@ object DispatcherService { private val logger = Logger("DispatcherService") - private[service] def loadDatasets( - datasets: Validation[Map[DatasetId, Dataset]] - ): Map[DatasetId, Dataset] = - datasets.fold({ e => - logger.info(s"No datasets configured: $e") - Map[DatasetId, Dataset]() - }, identity) - - def apply(datasets: Validation[Map[DatasetId, Dataset]], - wokenService: WokenClientService): DispatcherService = - new DispatcherServiceImpl(loadDatasets(datasets), wokenService) + def apply(datasetsService: DatasetService, wokenService: WokenClientService): DispatcherService = + new DispatcherServiceImpl(datasetsService, wokenService) } diff --git a/src/main/scala/ch/chuv/lren/woken/service/FeaturesService.scala b/src/main/scala/ch/chuv/lren/woken/service/FeaturesService.scala index 3cce6594..b9c0ecb2 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/FeaturesService.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/FeaturesService.scala @@ -27,9 +27,8 @@ import ch.chuv.lren.woken.dao.{ FeaturesTableRepository, PrefillExtendedFeaturesTable } -import ch.chuv.lren.woken.messages.datasets.DatasetId +import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId } import ch.chuv.lren.woken.core.fp.runNow -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.messages.query.filters.FilterRule import spray.json.JsObject diff --git a/src/main/scala/ch/chuv/lren/woken/service/MiningCacheService.scala b/src/main/scala/ch/chuv/lren/woken/service/MiningCacheService.scala index de1c41ea..619fb349 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/MiningCacheService.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/MiningCacheService.scala @@ -70,9 +70,9 @@ class MiningCacheServiceImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( val tables = databaseServices.config.featuresDb.tables val variablesMetaService = databaseServices.variablesMetaService - tables.foreach { table => + tables.values.foreach { table => // TODO: add support for table schema - runNow(variablesMetaService.get(table.table.name).map { metaO => + runNow(variablesMetaService.get(table.table).map { metaO => metaO.foreach { variables => { @@ -128,7 +128,7 @@ class MiningCacheServiceImpl[F[_]: ConcurrentEffect: ContextShift: Timer]( covariablesMustExist = false, groupings, None, - Some(table.table.name), + Some(table.table), TreeSet(), algorithm, None) diff --git a/src/main/scala/ch/chuv/lren/woken/service/QueryToJobService.scala b/src/main/scala/ch/chuv/lren/woken/service/QueryToJobService.scala index 9d6e63b0..ec7448e7 100644 --- a/src/main/scala/ch/chuv/lren/woken/service/QueryToJobService.scala +++ b/src/main/scala/ch/chuv/lren/woken/service/QueryToJobService.scala @@ -28,10 +28,10 @@ import ch.chuv.lren.woken.config.JobsConfiguration import ch.chuv.lren.woken.core.features.Queries import ch.chuv.lren.woken.core.features.Queries._ import ch.chuv.lren.woken.core.model._ -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.core.model.jobs.{ ExperimentJob, _ } import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.dao.VariablesMetaRepository +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.messages.variables.{ VariableId, VariableMetaData } import com.typesafe.scalalogging.LazyLogging @@ -219,13 +219,8 @@ class QueryToJobServiceImpl[F[_]: Effect]( query: Q ): F[Validation[PreparedQuery[Q]]] = { - val jobId = UUID.randomUUID().toString - val featuresDb = jobsConfiguration.featuresDb - // TODO: define target db schema from configuration or query - val featuresDbSchema = None - val featuresTableName = query.targetTable.getOrElse(jobsConfiguration.featuresTable) - val featuresTable = TableId(featuresDb, featuresDbSchema, featuresTableName) - val metadataKey = query.targetTable.getOrElse(jobsConfiguration.metadataKeyForFeaturesTable) + val jobId = UUID.randomUUID().toString + val featuresTable = query.targetTable.getOrElse(jobsConfiguration.defaultFeaturesTable) def prepareFeedback(oldVars: FeatureIdentifiers, existingVars: FeatureIdentifiers): UserFeedbacks = @@ -234,15 +229,15 @@ class QueryToJobServiceImpl[F[_]: Effect]( .toNel .fold[UserFeedbacks](Nil)( missing => { - val missingFields = missing.map(Queries.toField).mkString_("", ",", "") + val missingFields = missing.map(Queries.toField).mkString_(",") List(UserInfo(s"Missing variables $missingFields")) } ) - variablesMetaService.get(metadataKey).map { variablesMetaO => + variablesMetaService.get(featuresTable).map { variablesMetaO => val variablesMeta: Validation[VariablesMeta] = Validated.fromOption( variablesMetaO, - NonEmptyList(s"Cannot find metadata for table $metadataKey", Nil) + NonEmptyList(s"Cannot find metadata for table ${featuresTable.toString}", Nil) ) val validatedQueryWithFeedback: Validation[(Q, UserFeedbacks)] = variablesMeta.map { v => @@ -272,12 +267,12 @@ class QueryToJobServiceImpl[F[_]: Effect]( case q: MiningQuery => q.copy(covariables = existingCovariables, grouping = existingGroupings, - targetTable = Some(featuresTableName)) + targetTable = Some(featuresTable)) .asInstanceOf[Q] case q: ExperimentQuery => q.copy(covariables = existingCovariables, grouping = existingGroupings, - targetTable = Some(featuresTableName)) + targetTable = Some(featuresTable)) .asInstanceOf[Q] } @@ -311,11 +306,9 @@ class QueryToJobServiceImpl[F[_]: Effect]( val _ :: featuresTable :: _ :: query :: _ :: HNil = preparedQuery - val table = query.targetTable.fold(featuresTable)(t => featuresTable.copy(name = t)) - // TODO: Add targetSchema to query or schema to configuration or both, use it here instead of None + val table = query.targetTable.getOrElse(featuresTable) val validTableService: Validation[FeaturesTableService[F]] = - featuresService - .featuresTable(table) + featuresService.featuresTable(table) validTableService .map { tableService => diff --git a/src/test/resources/localDatasets.conf b/src/test/resources/localDatasets.conf index 4b1b9b3e..9e7ae52c 100644 --- a/src/test/resources/localDatasets.conf +++ b/src/test/resources/localDatasets.conf @@ -3,7 +3,7 @@ datasets { sample { label = "Sample data" description = "Sample data" - tables = ["Sample"] + tables = ["sample_data"] anonymisationLevel = "Anonymised" } diff --git a/src/test/resources/remoteDatasets.conf b/src/test/resources/remoteDatasets.conf index 76b53b54..b92e411b 100644 --- a/src/test/resources/remoteDatasets.conf +++ b/src/test/resources/remoteDatasets.conf @@ -3,7 +3,7 @@ datasets { remoteData1 { label = "Remote dataset #1" description = "Remote dataset #1" - tables = ["DATA"] + tables = ["cde_features_a"] anonymisationLevel = "Depersonalised" location { url = "http://service.remote/1" @@ -13,7 +13,7 @@ datasets { remoteData2 { label = "Remote dataset #2" description = "Remote dataset #2" - tables = ["DATA"] + tables = ["cde_features_a"] anonymisationLevel = "Depersonalised" location { url = "http://service.remote/2" @@ -27,7 +27,7 @@ datasets { remoteData3 { label = "Remote dataset #3" description = "Remote dataset #3" - tables = ["DATA"] + tables = ["cde_features_a"] anonymisationLevel = "Depersonalised" location { url = "wss://service.remote/3" diff --git a/src/test/resources/test.conf b/src/test/resources/test.conf index a59f8f7e..6695ee67 100644 --- a/src/test/resources/test.conf +++ b/src/test/resources/test.conf @@ -46,9 +46,11 @@ jobs { node = "federation" owner = "admin@mip.chuv.ch" chronosServerUrl = "http://localhost:9999" - resultDb = "local" - featuresDb = "features_db" - featuresTable = "features" + resultDb = "woken" + defaultFeaturesDatabase = "features_db" + defaultFeaturesTable = { + name = "features" + } metaDb = "meta" } @@ -77,7 +79,7 @@ db { pool_size = 1 tables { - "Sample" { + "sample_data" { primaryKey = [ { name = "ID" diff --git a/src/test/scala/ch/chuv/lren/woken/akka/MasterRouterTest.scala b/src/test/scala/ch/chuv/lren/woken/akka/MasterRouterTest.scala index 57a63874..d4ffb05e 100644 --- a/src/test/scala/ch/chuv/lren/woken/akka/MasterRouterTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/akka/MasterRouterTest.scala @@ -22,7 +22,6 @@ import java.util.UUID import akka.actor.{ ActorSystem, Props } import akka.stream.ActorMaterializer import akka.testkit.{ ImplicitSender, TestKit } -import com.typesafe.config.{ Config, ConfigFactory } import ch.chuv.lren.woken.config._ import ch.chuv.lren.woken.config.ConfigurationInstances._ import ch.chuv.lren.woken.backends.woken.WokenClientService @@ -30,15 +29,13 @@ import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.core.features.Queries._ import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.service._ -import ch.chuv.lren.woken.messages.datasets.{ Dataset, DatasetId, DatasetsQuery, DatasetsResponse } +import ch.chuv.lren.woken.messages.datasets._ import ch.chuv.lren.woken.messages.datasets.AnonymisationLevel._ import ch.chuv.lren.woken.messages.variables.VariableId import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } import org.scalatest.tagobjects.Slow -import cats.data.Validated._ import cats.effect.{ Effect, IO } import cats.syntax.validated._ -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.core.model.jobs.{ DockerJob, ExperimentJob } import ch.chuv.lren.woken.messages.remoting.RemoteLocation @@ -60,12 +57,10 @@ class MasterRouterTest import ch.chuv.lren.woken.service.TestServices._ - val tableId = TableId("test_db", None, "features_table") - def experimentQuery2job(query: ExperimentQuery): Validation[ExperimentJob] = ExperimentJob( jobId = UUID.randomUUID().toString, - inputTable = tableId, + inputTable = featuresTableId, query = query, metadata = Nil, queryAlgorithms = Map() @@ -74,7 +69,7 @@ class MasterRouterTest def miningQuery2job(query: MiningQuery): Validation[DockerJob] = { val featuresQuery = query .filterNulls(variablesCanBeNull = true, covariablesCanBeNull = true) - .features(tableId, None) + .features(featuresTableId, None) DockerJob( jobId = UUID.randomUUID().toString, @@ -110,30 +105,17 @@ class MasterRouterTest } - val tsConfig: Config = ConfigFactory - .parseResourcesAnySyntax("remoteDatasets.conf") - .withFallback(ConfigFactory.load("test.conf")) - .resolve() - - val appConfig: AppConfiguration = AppConfiguration - .read(tsConfig) - .valueOr( - e => throw new IllegalStateException(s"Invalid configuration: ${e.toList.mkString(", ")}") - ) - - val jdbcConfigs: String => Validation[DatabaseConfiguration] = _ => Valid(noDbConfig) - - val config: WokenConfiguration = WokenConfiguration(tsConfig) + val config: WokenConfiguration = centralNodeConfig implicit val materializer: ActorMaterializer = ActorMaterializer() val wokenService: WokenClientService = WokenClientService("test") - val dispatcherService: DispatcherService = - DispatcherService(DatasetsConfiguration.datasets(config.config), wokenService) - val databaseServices: DatabaseServices[IO] = TestServices.databaseServices(config) + val dispatcherService: DispatcherService = + DispatcherService(databaseServices.datasetService, wokenService) + val user: UserId = UserId("test") "Master actor" must { @@ -147,7 +129,7 @@ class MasterRouterTest "starts new experiments" ignore { - val limit = appConfig.masterRouterConfig.experimentActorsLimit + val limit = config.app.masterRouterConfig.experimentActorsLimit (1 to limit).foreach { _ => router ! ExperimentQuery( @@ -182,7 +164,7 @@ class MasterRouterTest "not start new experiments over the limit of concurrent experiments, then recover" taggedAs Slow ignore { - val limit = appConfig.masterRouterConfig.experimentActorsLimit + val limit = config.app.masterRouterConfig.experimentActorsLimit val overflow = limit * 2 (1 to overflow).foreach { _ => @@ -277,7 +259,7 @@ class MasterRouterTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("sample_data"), + targetTable = Some(sampleDataTableId), algorithm = AlgorithmSpec("knn", List(CodeValue("k", "5")), None), datasets = TreeSet(), executionPlan = None @@ -294,29 +276,35 @@ class MasterRouterTest "return available datasets" in { - router ! DatasetsQuery(Some("DATA")) + router ! DatasetsQuery(Some(cdeFeaturesATableId.name)) within(5 seconds) { val msg = expectMsgType[DatasetsResponse] val expected = Set( - Dataset(DatasetId("remoteData1"), - "Remote dataset #1", - "Remote dataset #1", - List("DATA"), - Depersonalised, - Some(RemoteLocation("http://service.remote/1", None))), - Dataset(DatasetId("remoteData2"), - "Remote dataset #2", - "Remote dataset #2", - List("DATA"), - Depersonalised, - Some(RemoteLocation("http://service.remote/2", None))), - Dataset(DatasetId("remoteData3"), - "Remote dataset #3", - "Remote dataset #3", - List("DATA"), - Depersonalised, - Some(RemoteLocation("wss://service.remote/3", None))) + Dataset( + DatasetId("remoteData1"), + "Remote dataset #1", + "Remote dataset #1", + List(cdeFeaturesATableId), + Depersonalised, + Some(RemoteLocation("http://service.remote/1", None)) + ), + Dataset( + DatasetId("remoteData2"), + "Remote dataset #2", + "Remote dataset #2", + List(cdeFeaturesATableId), + Depersonalised, + Some(RemoteLocation("http://service.remote/2", None)) + ), + Dataset( + DatasetId("remoteData3"), + "Remote dataset #3", + "Remote dataset #3", + List(cdeFeaturesATableId), + Depersonalised, + Some(RemoteLocation("wss://service.remote/3", None)) + ) ) msg.datasets shouldBe expected diff --git a/src/test/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronosTest.scala b/src/test/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronosTest.scala index 104ee0cc..03572504 100644 --- a/src/test/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronosTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/backends/faas/chronos/JobToChronosTest.scala @@ -20,10 +20,9 @@ package ch.chuv.lren.woken.backends.faas.chronos import cats.data.ValidatedNel import cats.syntax.validated._ import ch.chuv.lren.woken.Predefined.Algorithms.{ knnDefinition, knnWithK5 } -import ch.chuv.lren.woken.config.{ DatabaseConfiguration, JobsConfiguration } +import ch.chuv.lren.woken.config.{ DatabaseConfiguration, DatabaseId, JobsConfiguration } import ch.chuv.lren.woken.core.features.FeaturesQuery import ch.chuv.lren.woken.core.features.Queries._ -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.core.model.jobs.DockerJob import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.messages.query.filters.{ InputType, Operator, SingleFilterRule } @@ -31,6 +30,7 @@ import ch.chuv.lren.woken.messages.variables.{ VariableId, VariableMetaData, Var import org.scalatest.{ FlatSpec, Matchers } import scala.collection.immutable.TreeSet +import ch.chuv.lren.woken.config.ConfigurationInstances._ class JobToChronosTest extends FlatSpec with Matchers { @@ -53,15 +53,14 @@ class JobToChronosTest extends FlatSpec with Matchers { executionPlan = None ) - val tableId = TableId("features_db", None, "features_table") - val featuresQuery: FeaturesQuery = query .filterNulls(variablesCanBeNull = false, covariablesCanBeNull = false) - .features(tableId, None) + .features(featuresTableId, None) - val jdbcConfs: Map[String, ValidatedNel[String, DatabaseConfiguration]] = Map( - "features_db" -> DatabaseConfiguration( + val jdbcConfs: Map[DatabaseId, ValidatedNel[String, DatabaseConfiguration]] = Map( + featuresDb -> DatabaseConfiguration( + featuresDb, dbiDriver = "PostgreSQL", dbApiDriver = "postgresql", jdbcDriver = "org.postgresql.Driver", @@ -73,9 +72,10 @@ class JobToChronosTest extends FlatSpec with Matchers { user = "user", password = "test", poolSize = 5, - tables = Set() + tables = Map() ).validNel, - "woken_db" -> DatabaseConfiguration( + wokenDb -> DatabaseConfiguration( + wokenDb, dbiDriver = "PostgreSQL", dbApiDriver = "postgresql", jdbcDriver = "org.postgresql.Driver", @@ -87,7 +87,7 @@ class JobToChronosTest extends FlatSpec with Matchers { user = "woken", password = "wpwd", poolSize = 5, - tables = Set() + tables = Map() ).validNel ).withDefaultValue("".invalidNel) @@ -176,11 +176,10 @@ class JobToChronosTest extends FlatSpec with Matchers { node = "test", owner = "mip@chuv.ch", chronosServerUrl = "http://localhost:4400", - featuresDb = "features_db", - featuresTable = "features", - metadataKeyForFeaturesTable = "features", - resultDb = "woken_db", - metaDb = "meta_db", + defaultFeaturesDatabase = featuresDb, + defaultFeaturesTable = featuresTableId, + resultDb = wokenDb, + metaDb = metaDb, 1.0, 256 ) @@ -205,7 +204,7 @@ class JobToChronosTest extends FlatSpec with Matchers { EnvironmentVariable("MODEL_PARAM_k", "5"), EnvironmentVariable( "PARAM_query", - """SELECT "target","a","b","c","grp1","grp2" FROM "features_table" WHERE "target" IS NOT NULL AND "a" IS NOT NULL AND "b" IS NOT NULL AND "c" IS NOT NULL AND "grp1" IS NOT NULL AND "grp2" IS NOT NULL AND "a" < 10""" + """SELECT "target","a","b","c","grp1","grp2" FROM "features" WHERE "target" IS NOT NULL AND "a" IS NOT NULL AND "b" IS NOT NULL AND "c" IS NOT NULL AND "grp1" IS NOT NULL AND "grp2" IS NOT NULL AND "a" < 10""" ), EnvironmentVariable("PARAM_grouping", "grp1,grp2"), EnvironmentVariable( @@ -256,6 +255,8 @@ class JobToChronosTest extends FlatSpec with Matchers { retries = 0 ) + // ChronosJob(python_knn_1234,None,compute,List(),false,R1//PT1M,Some(PT5M),false,None,None,None,Some(Container(DOCKER,hbpmip/python-knn,false,List(),List(),HOST,List())),Some(1.0),None,Some(256.0),false,Some(mip@chuv.ch),None,List(EnvironmentVariable(DOCKER_IMAGE,hbpmip/python-knn), EnvironmentVariable(IN_DATABASE,features), EnvironmentVariable(IN_DBAPI_DRIVER,postgresql), EnvironmentVariable(IN_DBI_DRIVER,PostgreSQL), EnvironmentVariable(IN_HOST,localhost), EnvironmentVariable(IN_JDBC_DRIVER,org.postgresql.Driver), EnvironmentVariable(IN_JDBC_URL,jdbc:postgres:localhost:5432/features), EnvironmentVariable(IN_PASSWORD,test), EnvironmentVariable(IN_PORT,5432), EnvironmentVariable(IN_USER,user), EnvironmentVariable(JOB_ID,1234), EnvironmentVariable(MODEL_PARAM_k,5), EnvironmentVariable(NODE,test), EnvironmentVariable(OUT_DATABASE,woken), EnvironmentVariable(OUT_DBAPI_DRIVER,postgresql), EnvironmentVariable(OUT_DBI_DRIVER,PostgreSQL), EnvironmentVariable(OUT_HOST,localhost), EnvironmentVariable(OUT_JDBC_DRIVER,org.postgresql.Driver), EnvironmentVariable(OUT_JDBC_URL,jdbc:postgres:localhost:5432/woken), EnvironmentVariable(OUT_PASSWORD,wpwd), EnvironmentVariable(OUT_PORT,5432), EnvironmentVariable(OUT_USER,woken), EnvironmentVariable(PARAM_covariables,a,b,c), EnvironmentVariable(PARAM_grouping,grp1,grp2), EnvironmentVariable(PARAM_meta,{"grp2":{"code":"grp2","label":"grp2","type":"text"},"a":{"code":"a","label":"a","type":"text"},"grp1":{"code":"grp1","label":"grp1","type":"text"},"b":{"code":"b","label":"b","type":"text"},"target":{"code":"target","label":"target","type":"text"},"c":{"code":"c","label":"c","type":"text"}}), EnvironmentVariable(PARAM_query,SELECT "target","a","b","c","grp1","grp2" FROM "features" WHERE "target" IS NOT NULL AND "a" IS NOT NULL AND "b" IS NOT NULL AND "c" IS NOT NULL AND "grp1" IS NOT NULL AND "grp2" IS NOT NULL AND "a" < 10), EnvironmentVariable(PARAM_variables,target)),0) was not equal to + // ChronosJob(python_knn_1234,None,compute,List(),false,R1//PT1M,Some(PT5M),false,None,None,None,Some(Container(DOCKER,hbpmip/python-knn,false,List(),List(),HOST,List())),Some(1.0),None,Some(256.0),false,Some(mip@chuv.ch),None,List(EnvironmentVariable(DOCKER_IMAGE,hbpmip/python-knn), EnvironmentVariable(IN_DATABASE,features), EnvironmentVariable(IN_DBAPI_DRIVER,postgresql), EnvironmentVariable(IN_DBI_DRIVER,PostgreSQL), EnvironmentVariable(IN_HOST,localhost), EnvironmentVariable(IN_JDBC_DRIVER,org.postgresql.Driver), EnvironmentVariable(IN_JDBC_URL,jdbc:postgres:localhost:5432/features), EnvironmentVariable(IN_PASSWORD,test), EnvironmentVariable(IN_PORT,5432), EnvironmentVariable(IN_USER,user), EnvironmentVariable(JOB_ID,1234), EnvironmentVariable(MODEL_PARAM_k,5), EnvironmentVariable(NODE,test), EnvironmentVariable(OUT_DATABASE,woken), EnvironmentVariable(OUT_DBAPI_DRIVER,postgresql), EnvironmentVariable(OUT_DBI_DRIVER,PostgreSQL), EnvironmentVariable(OUT_HOST,localhost), EnvironmentVariable(OUT_JDBC_DRIVER,org.postgresql.Driver), EnvironmentVariable(OUT_JDBC_URL,jdbc:postgres:localhost:5432/woken), EnvironmentVariable(OUT_PASSWORD,wpwd), EnvironmentVariable(OUT_PORT,5432), EnvironmentVariable(OUT_USER,woken), EnvironmentVariable(PARAM_covariables,a,b,c), EnvironmentVariable(PARAM_grouping,grp1,grp2), EnvironmentVariable(PARAM_meta,{"grp2":{"code":"grp2","label":"grp2","type":"text"},"a":{"code":"a","label":"a","type":"text"},"grp1":{"code":"grp1","label":"grp1","type":"text"},"b":{"code":"b","label":"b","type":"text"},"target":{"code":"target","label":"target","type":"text"},"c":{"code":"c","label":"c","type":"text"}}), EnvironmentVariable(PARAM_query,SELECT "target","a","b","c","grp1","grp2" FROM "features_table" WHERE "target" IS NOT NULL AND "a" IS NOT NULL AND "b" IS NOT NULL AND "c" IS NOT NULL AND "grp1" IS NOT NULL AND "grp2" IS NOT NULL AND "a" < 10), EnvironmentVariable(PARAM_variables,target)),0) (JobToChronosTest.scala:282)//[info] /* import ai.x.diff.DiffShow import ai.x.diff.conversions._ diff --git a/src/test/scala/ch/chuv/lren/woken/config/ConfigurationInstances.scala b/src/test/scala/ch/chuv/lren/woken/config/ConfigurationInstances.scala index 0b95141e..a0fe52a8 100644 --- a/src/test/scala/ch/chuv/lren/woken/config/ConfigurationInstances.scala +++ b/src/test/scala/ch/chuv/lren/woken/config/ConfigurationInstances.scala @@ -17,9 +17,14 @@ package ch.chuv.lren.woken.config +import ch.chuv.lren.woken.messages.datasets.TableId +import com.typesafe.config.{ Config, ConfigFactory } + object ConfigurationInstances { + val noDbConfig = DatabaseConfiguration( + id = DatabaseId("no_db"), dbiDriver = "DBI", dbApiDriver = "DBAPI", jdbcDriver = "java.lang.String", @@ -31,18 +36,50 @@ object ConfigurationInstances { user = "", password = "", poolSize = 5, - tables = Set() + tables = Map() ) + + val featuresDb = DatabaseId("features_db") + val wokenDb = DatabaseId("woken") + val metaDb = DatabaseId("meta") + + val unknownTableId = TableId("unknown", "unknown") + val featuresTableId: TableId = tableId("features") + val churnDataTableId: TableId = tableId("churn") + val sampleDataTableId: TableId = tableId("sample_data") + val cdeFeaturesATableId: TableId = tableId("cde_features_a") + val cdeFeaturesBTableId: TableId = tableId("cde_features_b") + val cdeFeaturesCTableId: TableId = tableId("cde_features_c") + val cdeFeaturesMixedTableId: TableId = tableId("cde_features_mixed") + val noJobsConf = JobsConfiguration("testNode", "noone", "http://nowhere", - "features", - "features", - "features", - "results", - "meta", + featuresDb, + featuresTableId, + wokenDb, + metaDb, 0.5, 512) + lazy val localNodeConfigSource: Config = ConfigFactory + .parseResourcesAnySyntax("localDatasets.conf") + .withFallback(ConfigFactory.load("algorithms.conf")) + .withFallback(ConfigFactory.load("test.conf")) + .resolve() + + lazy val localNodeConfig: WokenConfiguration = WokenConfiguration(localNodeConfigSource) + + lazy val centralNodeConfigSource: Config = ConfigFactory + .parseResourcesAnySyntax("remoteDatasets.conf") + .withFallback(ConfigFactory.load("algorithms.conf")) + .withFallback(ConfigFactory.load("test.conf")) + .resolve() + + lazy val centralNodeConfig: WokenConfiguration = WokenConfiguration(centralNodeConfigSource) + + lazy val featuresDbConfiguration: DatabaseConfiguration = localNodeConfig.featuresDb + + private def tableId(name: String) = TableId(featuresDb.code, name) } diff --git a/src/test/scala/ch/chuv/lren/woken/config/DatabaseConfigurationTest.scala b/src/test/scala/ch/chuv/lren/woken/config/DatabaseConfigurationTest.scala new file mode 100644 index 00000000..2ad0f1a8 --- /dev/null +++ b/src/test/scala/ch/chuv/lren/woken/config/DatabaseConfigurationTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2017 LREN CHUV for Human Brain Project + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package ch.chuv.lren.woken.config +import cats.scalatest.{ ValidatedMatchers, ValidatedValues } +import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, WordSpec } +import ch.chuv.lren.woken.core.model.database.TableColumn +import ch.chuv.lren.woken.messages.variables.SqlType +import ConfigurationInstances._ + +class DatabaseConfigurationTest + extends WordSpec + with Matchers + with ValidatedMatchers + with ValidatedValues { + + val dbConfigs: DatabaseId => Validation[DatabaseConfiguration] = + DatabaseConfiguration.factory(ConfigFactory.load("test.conf")) + + "Configuration for databases" should { + + "read configuration for the woken database" in { + val config = dbConfigs(wokenDb) + + config shouldBe valid + config.value.id.code shouldBe "woken" + config.value.user shouldBe "woken" + config.value.host shouldBe "db" + config.value.port shouldBe 5432 + config.value.database shouldBe "woken" + config.value.poolSize shouldBe 1 + config.value.tables shouldBe empty + } + + "read configuration for the features database and check the list of tables" in { + val config = dbConfigs(featuresDb) + + config shouldBe valid + config.value.id.code shouldBe "features_db" + config.value.user shouldBe "features" + config.value.host shouldBe "db" + config.value.port shouldBe 5432 + config.value.database shouldBe "features" + config.value.poolSize shouldBe 1 + config.value.tables.size shouldBe 6 + + val cdeFeaturesATable = config.value.tables.get(cdeFeaturesATableId) + cdeFeaturesATable.isDefined shouldBe true + cdeFeaturesATable.get.table shouldBe cdeFeaturesATableId + cdeFeaturesATable.get.owner shouldBe None + cdeFeaturesATable.get.validateSchema shouldBe true + cdeFeaturesATable.get.primaryKey shouldBe List(TableColumn("subjectcode", SqlType.varchar)) + cdeFeaturesATable.get.datasetColumn shouldBe Some(TableColumn("dataset", SqlType.varchar)) + + val sampleDataTable = config.value.tables.get(sampleDataTableId) + sampleDataTable.isDefined shouldBe true + sampleDataTable.get.table shouldBe sampleDataTableId + sampleDataTable.get.owner shouldBe None + sampleDataTable.get.validateSchema shouldBe true + sampleDataTable.get.primaryKey shouldBe List(TableColumn("ID", SqlType.int)) + sampleDataTable.get.datasetColumn shouldBe None + + } + + } +} diff --git a/src/test/scala/ch/chuv/lren/woken/config/DatasetsConfigurationTest.scala b/src/test/scala/ch/chuv/lren/woken/config/DatasetsConfigurationTest.scala new file mode 100644 index 00000000..45978c88 --- /dev/null +++ b/src/test/scala/ch/chuv/lren/woken/config/DatasetsConfigurationTest.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2017 LREN CHUV for Human Brain Project + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package ch.chuv.lren.woken.config + +import cats.scalatest.{ ValidatedMatchers, ValidatedValues } +import ch.chuv.lren.woken.config.ConfigurationInstances._ +import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation +import ch.chuv.lren.woken.messages.datasets.{ AnonymisationLevel, Dataset, DatasetId } +import org.scalatest.{ Matchers, WordSpec } +import AnonymisationLevel._ +import akka.http.scaladsl.model.Uri +import ch.chuv.lren.woken.messages.remoting.{ BasicAuthentication, RemoteLocation } + +class DatasetsConfigurationTest + extends WordSpec + with Matchers + with ValidatedMatchers + with ValidatedValues { + + val localDatasetConfigs: DatasetId => Validation[Dataset] = + DatasetsConfiguration.factory(localNodeConfigSource, featuresDb) + + val remoteDatasetConfigs: DatasetId => Validation[Dataset] = + DatasetsConfiguration.factory(centralNodeConfigSource, featuresDb) + + "Configuration for datasets" should { + + "ignore invalid datasets" in { + localDatasetConfigs(DatasetId("invalid")) shouldBe invalid + } + + "read configuration for sample dataset" in { + val config = localDatasetConfigs(DatasetId("sample")) + + config shouldBe valid + config.value.id.code shouldBe "sample" + config.value.label shouldBe "Sample data" + config.value.description shouldBe "Sample data" + config.value.tables shouldBe List(sampleDataTableId) + config.value.anonymisationLevel shouldBe Anonymised + config.value.location shouldBe None + } + + "read configuration for nida-synthdata dataset" in { + val config = localDatasetConfigs(DatasetId("nida-synthdata")) + + config shouldBe valid + config.value.id.code shouldBe "nida-synthdata" + config.value.label shouldBe "NIDA" + config.value.description shouldBe "Demo dataset NIDA" + config.value.tables shouldBe List(cdeFeaturesBTableId, cdeFeaturesMixedTableId) + config.value.anonymisationLevel shouldBe Depersonalised + config.value.location shouldBe None + } + + "read configuration for remote data2 dataset" in { + val config = remoteDatasetConfigs(DatasetId("remoteData2")) + + config shouldBe valid + config.value.id.code shouldBe "remoteData2" + config.value.label shouldBe "Remote dataset #2" + config.value.description shouldBe "Remote dataset #2" + config.value.tables shouldBe List(cdeFeaturesATableId) + config.value.anonymisationLevel shouldBe Depersonalised + config.value.location shouldBe Some( + RemoteLocation(Uri("http://service.remote/2"), + Some(BasicAuthentication("woken", "wokenpwd"))) + ) + } + } +} diff --git a/src/test/scala/ch/chuv/lren/woken/core/features/FeaturesQueryTest.scala b/src/test/scala/ch/chuv/lren/woken/core/features/FeaturesQueryTest.scala index 7b2bea54..b14c4168 100644 --- a/src/test/scala/ch/chuv/lren/woken/core/features/FeaturesQueryTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/core/features/FeaturesQueryTest.scala @@ -17,14 +17,15 @@ package ch.chuv.lren.woken.core.features -import ch.chuv.lren.woken.core.model.database.{ TableColumn, TableId } +import ch.chuv.lren.woken.core.model.database.TableColumn +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.filters.{ InputType, Operator, SingleFilterRule } import ch.chuv.lren.woken.messages.variables.SqlType import org.scalatest.{ Matchers, WordSpec } class FeaturesQueryTest extends WordSpec with Matchers { - val tableId = TableId("test_db", None, "table") + val tableId = TableId("test_db", "table") "FeaturesQuery" should { diff --git a/src/test/scala/ch/chuv/lren/woken/core/features/QueriesTest.scala b/src/test/scala/ch/chuv/lren/woken/core/features/QueriesTest.scala index 46cde31e..8685401b 100644 --- a/src/test/scala/ch/chuv/lren/woken/core/features/QueriesTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/core/features/QueriesTest.scala @@ -17,7 +17,7 @@ package ch.chuv.lren.woken.core.features -import ch.chuv.lren.woken.core.model.database.TableId +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.query.{ AlgorithmSpec, CodeValue, MiningQuery, UserId } import org.scalatest.{ Matchers, WordSpec } import ch.chuv.lren.woken.messages.query.filters._ @@ -54,7 +54,7 @@ class QueriesTest extends WordSpec with Matchers { executionPlan = None ) - val inputTable = TableId("test_db", None, "inputTable") + val inputTable = TableId("test_db", "inputTable") "generate the SQL query" ignore { val featuresQuery = query.features(inputTable, None) diff --git a/src/test/scala/ch/chuv/lren/woken/core/model/VariablesMetaTest.scala b/src/test/scala/ch/chuv/lren/woken/core/model/VariablesMetaTest.scala index 2fc67eef..95989529 100644 --- a/src/test/scala/ch/chuv/lren/woken/core/model/VariablesMetaTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/core/model/VariablesMetaTest.scala @@ -24,6 +24,7 @@ import ch.chuv.lren.woken.messages.variables._ import org.scalatest.{ Matchers, WordSpec } import spray.json._ import variablesProtocol._ +import ch.chuv.lren.woken.config.ConfigurationInstances._ class VariablesMetaTest extends WordSpec with Matchers with JsonUtils { @@ -35,7 +36,7 @@ class VariablesMetaTest extends WordSpec with Matchers with JsonUtils { VariablesMeta(1, "test", json.convertTo[GroupMetaData], - "sample_data", + sampleDataTableId, List("state", "custserv_calls", "churn").map(VariableId)) val selectedMeta = @@ -86,7 +87,7 @@ class VariablesMetaTest extends WordSpec with Matchers with JsonUtils { 1, "test", json.convertTo[GroupMetaData], - "mip_cde_features", + cdeFeaturesATableId, List("dataset", "gender", "agegroup", "alzheimerbroadcategory").map(VariableId) ) @@ -123,7 +124,7 @@ class VariablesMetaTest extends WordSpec with Matchers with JsonUtils { VariablesMeta(1, "test", json.convertTo[GroupMetaData], - "sample_data", + sampleDataTableId, List("state", "custserv_calls", "churn").map(VariableId)) val selectedMeta = diff --git a/src/test/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJobTest.scala b/src/test/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJobTest.scala index 01041810..3388090b 100644 --- a/src/test/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJobTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/core/model/jobs/ExperimentJobTest.scala @@ -19,7 +19,7 @@ package ch.chuv.lren.woken.core.model.jobs import java.util.UUID -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableId } +import ch.chuv.lren.woken.core.model.database.FeaturesTableDescription import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.Predefined.Algorithms.{ @@ -28,14 +28,21 @@ import ch.chuv.lren.woken.Predefined.Algorithms.{ knnDefinition, knnWithK5 } +import ch.chuv.lren.woken.config.ConfigurationInstances._ import cats.implicits._ import cats.scalatest.{ ValidatedMatchers, ValidatedValues } +import ch.chuv.lren.woken.dao.FeaturesTableTestSupport import ch.chuv.lren.woken.messages.variables.VariableId import org.scalatest.{ Matchers, WordSpec } import scala.collection.immutable.TreeSet -class ExperimentJobTest extends WordSpec with Matchers with ValidatedMatchers with ValidatedValues { +class ExperimentJobTest + extends WordSpec + with Matchers + with ValidatedMatchers + with ValidatedValues + with FeaturesTableTestSupport { val user: UserId = UserId("test") @@ -81,7 +88,7 @@ class ExperimentJobTest extends WordSpec with Matchers with ValidatedMatchers wi covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithms = List(AlgorithmSpec(algorithm, parameters, None)), validations = List(ValidationSpec("kfold", List(CodeValue("k", "2")))), trainingDatasets = TreeSet(), @@ -90,16 +97,17 @@ class ExperimentJobTest extends WordSpec with Matchers with ValidatedMatchers wi executionPlan = None ) - private def experimentQuery2job(query: ExperimentQuery): Validation[ExperimentJob] = + private def experimentQuery2job(query: ExperimentQuery): Validation[ExperimentJob] = { + val targetTableId = query.targetTable.getOrElse(sampleDataTableId) + val targetTable: FeaturesTableDescription = targetTableId match { + case `sampleDataTableId` => sampleTable + case `cdeFeaturesATableId` => cdeTable + case o => throw new NotImplementedError(s"$o is not supported") + } ExperimentJob.mkValid( UUID.randomUUID().toString, query, - FeaturesTableDescription(TableId("features_db", None, query.targetTable.getOrElse("Sample")), - Nil, - None, - validateSchema = false, - None, - 0.67), + targetTable, Nil, { spec => Map(knnWithK5 -> knnDefinition, anovaFactorial -> anovaDefinition) .get(spec) @@ -107,5 +115,6 @@ class ExperimentJobTest extends WordSpec with Matchers with ValidatedMatchers wi .toValidatedNel[String] } ) + } } diff --git a/src/test/scala/ch/chuv/lren/woken/core/sqlUtilsTest.scala b/src/test/scala/ch/chuv/lren/woken/core/sqlUtilsTest.scala index 23df4e3b..0873fc0e 100644 --- a/src/test/scala/ch/chuv/lren/woken/core/sqlUtilsTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/core/sqlUtilsTest.scala @@ -17,14 +17,9 @@ package ch.chuv.lren.woken.core -import ch.chuv.lren.woken.core.model.database.{ - FeaturesTableDescription, - TableColumn, - TableId, - sqlUtils -} +import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, sqlUtils } import ch.chuv.lren.woken.messages.variables.SqlType -import ch.chuv.lren.woken.service.TestServices.database +import ch.chuv.lren.woken.config.ConfigurationInstances._ import org.scalatest.{ Matchers, WordSpec } import doobie.implicits._ @@ -34,7 +29,7 @@ class sqlUtilsTest extends WordSpec with Matchers { "generate an equals where clause between two tables and one column" in { val table1 = FeaturesTableDescription( - TableId(database, None, "cde_features_a"), + cdeFeaturesATableId, List(TableColumn("subjectcode", SqlType.varchar)), Some(TableColumn("dataset", SqlType.varchar)), validateSchema = false, @@ -45,7 +40,7 @@ class sqlUtilsTest extends WordSpec with Matchers { TableColumn("subjectcode", SqlType.varchar) ) - val table2 = table1.copy(table = TableId(database, None, "cde_features_b")) + val table2 = table1.copy(table = cdeFeaturesBTableId) val headers2 = headers1 sqlUtils.frEqual(table1, headers1, table2, headers2).toString() shouldBe @@ -55,7 +50,7 @@ class sqlUtilsTest extends WordSpec with Matchers { "generate an equals where clause between two tables and two columns" in { val table1 = FeaturesTableDescription( - TableId(database, None, "cde_features_a"), + cdeFeaturesATableId, List(TableColumn("subjectcode", SqlType.varchar)), Some(TableColumn("dataset", SqlType.varchar)), validateSchema = false, @@ -67,7 +62,7 @@ class sqlUtilsTest extends WordSpec with Matchers { TableColumn("dataset", SqlType.varchar) ) - val table2 = table1.copy(table = TableId(database, None, "cde_features_b")) + val table2 = table1.copy(table = cdeFeaturesBTableId) val headers2 = headers1 sqlUtils.frEqual(table1, headers1, table2, headers2).toString() shouldBe diff --git a/src/test/scala/ch/chuv/lren/woken/dao/ExtendedFeaturesTableRepositoryDAOTest.scala b/src/test/scala/ch/chuv/lren/woken/dao/ExtendedFeaturesTableRepositoryDAOTest.scala index a833729f..f387d614 100644 --- a/src/test/scala/ch/chuv/lren/woken/dao/ExtendedFeaturesTableRepositoryDAOTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/dao/ExtendedFeaturesTableRepositoryDAOTest.scala @@ -160,16 +160,16 @@ class ExtendedFeaturesTableRepositoryDAOTest xa => { val wokenRepository = new WokenInMemoryRepository[IO]() val sourceTable = - new FeaturesTableRepositoryDAO[IO](xa, sampleTable, sampleHeaders, wokenRepository) + new FeaturesTableRepositoryDAO[IO](xa, churnTable, churnHeaders, wokenRepository) val extendedTableFromNoKeyTable = ExtendedFeaturesTableRepositoryDAO .apply[IO](sourceTable, None, Nil, Nil, Nil, wokenRepository.nextTableSeqNumber) extendedTableFromNoKeyTable should haveInvalid( - "Extended features table expects a primary key of one column for table Sample" + "Extended features table expects a primary key of one column for table churn" ) sourceTable } ) { dao => - dao.table.table.name shouldBe "Sample" + dao.table.table.name shouldBe "churn" } "create an extended table without any new columns and use it" in withRepositoryResource[ diff --git a/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableRepositoryDAOTest.scala b/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableRepositoryDAOTest.scala index 01cbacee..8d224ea6 100644 --- a/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableRepositoryDAOTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableRepositoryDAOTest.scala @@ -40,21 +40,21 @@ class FeaturesTableRepositoryDAOTest .withQueryHandler { e: QueryExecution => e.sql.trim match { - case """SELECT count(*) FROM "Sample"""" => + case """SELECT count(*) FROM "sample_data"""" => rowList1(classOf[Int]) :+ 99 - case """SELECT count(*) FROM "Sample" WHERE "score_test1" >= 2 AND "cognitive_task2" < 9""" => + case """SELECT count(*) FROM "sample_data" WHERE "score_test1" >= 2 AND "cognitive_task2" < 9""" => rowList1(classOf[Int]) :+ 5 - case """SELECT count(*) FROM "Sample" WHERE "score_test1" < 0""" => + case """SELECT count(*) FROM "sample_data" WHERE "score_test1" < 0""" => rowList1(classOf[Int]) :+ 0 - case """SELECT "college_math" , count(*) FROM "Sample" GROUP BY "college_math"""" => + case """SELECT "college_math" , count(*) FROM "sample_data" GROUP BY "college_math"""" => (rowList2(classOf[String], classOf[Int]) :+ ("0", 47) // tuple as row :+ ("1", 52)) - case """SELECT "college_math" , count(*) FROM "Sample" WHERE "score_test1" >= 2 GROUP BY "college_math"""" => + case """SELECT "college_math" , count(*) FROM "sample_data" WHERE "score_test1" >= 2 GROUP BY "college_math"""" => (rowList2(classOf[String], classOf[Int]) :+ ("0", 12) // tuple as row :+ ("1", 22)) diff --git a/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableTestSupport.scala b/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableTestSupport.scala index f89f3ed2..beb38817 100644 --- a/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableTestSupport.scala +++ b/src/test/scala/ch/chuv/lren/woken/dao/FeaturesTableTestSupport.scala @@ -16,19 +16,31 @@ */ package ch.chuv.lren.woken.dao -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId } + +import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn } +import ch.chuv.lren.woken.config.ConfigurationInstances._ import ch.chuv.lren.woken.messages.variables.SqlType import spray.json.{ JsNumber, JsObject, JsString } trait FeaturesTableTestSupport { - val database = "features_db" - val sampleTable = FeaturesTableDescription(TableId(database, None, "Sample"), - Nil, - None, - validateSchema = false, - None, - 0.67) + val churnTable = + FeaturesTableDescription(churnDataTableId, Nil, None, validateSchema = false, None, 0.67) + val churnHeaders = List( + TableColumn("state", SqlType.char), + TableColumn("account_length", SqlType.int), + TableColumn("area_code", SqlType.int), + TableColumn("phone", SqlType.varchar), + TableColumn("intl_plan", SqlType.char) + ) + + val sampleTable = + FeaturesTableDescription(sampleDataTableId, + List(TableColumn("ID", SqlType.int)), + None, + validateSchema = false, + None, + 0.67) val sampleHeaders = List( TableColumn("ID", SqlType.int), TableColumn("stress_before_test1", SqlType.numeric), @@ -49,7 +61,7 @@ trait FeaturesTableTestSupport { ) val cdeTable = FeaturesTableDescription( - TableId(database, None, "cde_features_a"), + cdeFeaturesATableId, List(TableColumn("subjectcode", SqlType.varchar)), Some(TableColumn("dataset", SqlType.varchar)), validateSchema = false, diff --git a/src/test/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAOTest.scala b/src/test/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAOTest.scala index dbb89cc1..963401d1 100644 --- a/src/test/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAOTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/dao/MetadataRepositoryDAOTest.scala @@ -30,6 +30,7 @@ import ch.chuv.lren.woken.JsonUtils import ch.chuv.lren.woken.core.model.VariablesMeta import ch.chuv.lren.woken.messages.variables.{ GroupMetaData, VariableId } import ch.chuv.lren.woken.messages.variables.variablesProtocol._ +import ch.chuv.lren.woken.config.ConfigurationInstances._ class MetadataRepositoryDAOTest extends WordSpec with Matchers with MockFactory with JsonUtils { @@ -41,14 +42,14 @@ class MetadataRepositoryDAOTest extends WordSpec with Matchers with MockFactory VariablesMeta(1, "churn", churnHierarchy, - "CHURN", + churnDataTableId, List("state", "custserv_calls", "churn").map(VariableId)) val updated = dao.put(churnVariablesMeta).unsafeRunSync() updated shouldBe churnVariablesMeta - val retrieved = dao.get("CHURN").unsafeRunSync() + val retrieved = dao.get(churnDataTableId).unsafeRunSync() retrieved shouldBe churnVariablesMeta } diff --git a/src/test/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActorTest.scala b/src/test/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActorTest.scala index 00194509..da13d672 100644 --- a/src/test/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActorTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/dispatch/MetadataQueriesActorTest.scala @@ -22,7 +22,7 @@ import akka.stream.ActorMaterializer import akka.testkit.{ ImplicitSender, TestKit, TestProbe } import akka.util.Timeout import ch.chuv.lren.woken.backends.woken.WokenClientService -import ch.chuv.lren.woken.config.DatasetsConfiguration +import ch.chuv.lren.woken.config.WokenConfiguration import ch.chuv.lren.woken.dispatch.MetadataQueriesActor.VariablesForDatasets import ch.chuv.lren.woken.messages.datasets.DatasetId import ch.chuv.lren.woken.messages.variables.{ @@ -105,10 +105,10 @@ class MetadataQueriesActorTest val wokenService: WokenClientService = WokenClientService("test") - val dispatcherService: DispatcherService = - DispatcherService(DatasetsConfiguration.datasets(config), wokenService) + val datasetService: DatasetService = + ConfBasedDatasetService(config, WokenConfiguration(config).jobs) - val datasetService: DatasetService = ConfBasedDatasetService(config) + val dispatcherService: DispatcherService = DispatcherService(datasetService, wokenService) system.actorOf( MetadataQueriesActor.props(dispatcherService, datasetService, localVariablesMetaService) diff --git a/src/test/scala/ch/chuv/lren/woken/mining/ExperimentQuerySupport.scala b/src/test/scala/ch/chuv/lren/woken/mining/ExperimentQuerySupport.scala index f63da5c2..c5e6f766 100644 --- a/src/test/scala/ch/chuv/lren/woken/mining/ExperimentQuerySupport.scala +++ b/src/test/scala/ch/chuv/lren/woken/mining/ExperimentQuerySupport.scala @@ -19,6 +19,7 @@ package ch.chuv.lren.woken.mining import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.messages.variables.VariableId +import ch.chuv.lren.woken.config.ConfigurationInstances._ import scala.collection.immutable.TreeSet @@ -32,7 +33,7 @@ object ExperimentQuerySupport { covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithms = List(AlgorithmSpec(algorithm, parameters, None)), validations = List(ValidationSpec("kfold", List(CodeValue("k", "2")))), trainingDatasets = TreeSet(), @@ -49,7 +50,7 @@ object ExperimentQuerySupport { covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithms = algorithms, validations = List(ValidationSpec("kfold", List(CodeValue("k", "2")))), trainingDatasets = TreeSet(), diff --git a/src/test/scala/ch/chuv/lren/woken/mining/LocalExperimentServiceTest.scala b/src/test/scala/ch/chuv/lren/woken/mining/LocalExperimentServiceTest.scala index 368ed2c3..98621edf 100644 --- a/src/test/scala/ch/chuv/lren/woken/mining/LocalExperimentServiceTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/mining/LocalExperimentServiceTest.scala @@ -27,7 +27,6 @@ import ch.chuv.lren.woken.core.model.jobs._ import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.service.{ FeaturesService, QueryToJobService, TestServices } -import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.scalalogging.LazyLogging import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } import cats.scalatest.{ ValidatedMatchers, ValidatedValues } @@ -42,6 +41,7 @@ import ch.chuv.lren.woken.dao.VariablesMetaRepository import ExperimentQuerySupport._ import scala.collection.immutable.TreeSet +import ch.chuv.lren.woken.config.ConfigurationInstances._ /** * Experiment flow should always complete with success, but the error is reported inside the response. @@ -59,28 +59,21 @@ class LocalExperimentServiceTest implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val ec: ExecutionContext = ExecutionContext.global - val config: Config = ConfigFactory - .parseResourcesAnySyntax("localDatasets.conf") - .withFallback(ConfigFactory.load("algorithms.conf")) - .withFallback(ConfigFactory.load("test.conf")) - .resolve() - val user: UserId = UserId("test") val jobsConf = JobsConfiguration("testNode", "admin", "http://chronos", - "features_db", - "Sample", - "Sample", - "results_db", - "meta_db", + featuresDb, + cdeFeaturesATableId, + wokenDb, + metaDb, 0.5, 512) val algorithmLookup: String => Validation[AlgorithmDefinition] = - AlgorithmsConfiguration.factory(config) + AlgorithmsConfiguration.factory(localNodeConfigSource) val variablesMetaService: VariablesMetaRepository[IO] = TestServices.localVariablesMetaService val featuresService: FeaturesService[IO] = TestServices.featuresService @@ -155,20 +148,23 @@ class LocalExperimentServiceTest } - "complete with success in case of a valid query on k-NN algorithm (predictive)" in { + // TODO + "complete with success in case of a valid query on k-NN algorithm (predictive)" ignore { val experiment = experimentQuery(List(knnWithK5)) runExperimentTest(experiment, AlgorithmExecutorInstances.expectedAlgorithm("knn")) } - "split flow should return validation failed" in { + // TODO + "split flow should return validation failed" ignore { val experiment = experimentQuery(List(knnWithK5)) runExperimentTest(experiment, AlgorithmExecutorInstances.expectedAlgorithm("knn")) } - "complete with success in case of valid algorithms" in { + // TODO + "complete with success in case of valid algorithms" ignore { val experiment: ExperimentQuery = experimentQuery( List( knnWithK5, @@ -201,7 +197,7 @@ class LocalExperimentServiceTest result.attempt.unsafeRunSync() experimentResult match { - case Left(err) => fail("Failed to execute algorithm", err) + case Left(err) => fail("Failed to execute algorithm: " + err.toString, err) case Right(response) => logger.info(s"Experiment response: ${response.toQueryResult}") response.result match { diff --git a/src/test/scala/ch/chuv/lren/woken/service/DispatcherServiceTest.scala b/src/test/scala/ch/chuv/lren/woken/service/DispatcherServiceTest.scala index 0ba28fbc..59f699d1 100644 --- a/src/test/scala/ch/chuv/lren/woken/service/DispatcherServiceTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/service/DispatcherServiceTest.scala @@ -22,7 +22,7 @@ import akka.stream.ActorMaterializer import akka.testkit.TestKit import com.typesafe.config.{ Config, ConfigFactory } import ch.chuv.lren.woken.backends.woken.WokenClientService -import ch.chuv.lren.woken.config.DatasetsConfiguration +import ch.chuv.lren.woken.config.WokenConfiguration import ch.chuv.lren.woken.messages.datasets.DatasetId import ch.chuv.lren.woken.messages.remoting.{ BasicAuthentication, RemoteLocation } import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } @@ -42,10 +42,11 @@ class DispatcherServiceTest .withFallback(ConfigFactory.load("test.conf")) .resolve() - val wokenService = WokenClientService("test") + val wokenService = WokenClientService("test") + val datasetsService = ConfBasedDatasetService(config, WokenConfiguration(config).jobs) val dispatcherService: DispatcherService = - DispatcherService(DatasetsConfiguration.datasets(config), wokenService) + DispatcherService(datasetsService, wokenService) "Dispatcher service" should { "identify locations to dispatch to from a list of datasets" in { diff --git a/src/test/scala/ch/chuv/lren/woken/service/QueryToJobServiceTest.scala b/src/test/scala/ch/chuv/lren/woken/service/QueryToJobServiceTest.scala index b1bc9374..428c5d57 100644 --- a/src/test/scala/ch/chuv/lren/woken/service/QueryToJobServiceTest.scala +++ b/src/test/scala/ch/chuv/lren/woken/service/QueryToJobServiceTest.scala @@ -21,7 +21,6 @@ import cats.effect.IO import cats.scalatest.{ ValidatedMatchers, ValidatedValues } import ch.chuv.lren.woken.config.{ AlgorithmsConfiguration, JobsConfiguration } import ch.chuv.lren.woken.core.features.FeaturesQuery -import ch.chuv.lren.woken.core.model.database.TableId import ch.chuv.lren.woken.core.model._ import ch.chuv.lren.woken.core.model.jobs._ import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation @@ -30,6 +29,7 @@ import ch.chuv.lren.woken.messages.datasets.DatasetId import ch.chuv.lren.woken.messages.query._ import ch.chuv.lren.woken.messages.query.filters._ import ch.chuv.lren.woken.messages.variables._ +import ch.chuv.lren.woken.config.ConfigurationInstances._ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ Matchers, WordSpec } @@ -53,11 +53,10 @@ class QueryToJobServiceTest JobsConfiguration("testNode", "admin", "http://chronos", - "features_db", - "Sample", - "Sample", - "results_db", - "meta_db", + featuresDb, + sampleDataTableId, + wokenDb, + metaDb, 0.5, 512) @@ -121,7 +120,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("unknown"), + targetTable = Some(unknownTableId), algorithm = AlgorithmSpec("knn", List(CodeValue("k", "5")), None), datasets = TreeSet(), executionPlan = None @@ -130,7 +129,7 @@ class QueryToJobServiceTest val maybeJob = queryToJobService.miningQuery2Job(query).unsafeRunSync() maybeJob shouldBe invalid - maybeJob.invalidValue.head shouldBe "Cannot find metadata for table unknown" + maybeJob.invalidValue.head shouldBe "Cannot find metadata for table unknown.public.unknown" } "fail when the algorithm is unknown" in { @@ -141,7 +140,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithm = AlgorithmSpec("unknown", Nil, None), datasets = TreeSet(), executionPlan = None @@ -162,7 +161,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithm = AlgorithmSpec("knn", Nil, None), datasets = TreeSet(), executionPlan = None @@ -183,7 +182,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithm = AlgorithmSpec("knn", Nil, None), datasets = TreeSet(), executionPlan = None @@ -207,7 +206,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("Sample"), + targetTable = Some(sampleDataTableId), algorithm = AlgorithmSpec("knn", List(CodeValue("k", "5")), None), datasets = TreeSet(DatasetId("Sample")), executionPlan = None @@ -220,7 +219,6 @@ class QueryToJobServiceTest val job: DockerJob = maybeJob.value.job.asInstanceOf[MiningJob].dockerJob val feedback = maybeJob.value.feedback - val table = TableId("features_db", None, "Sample") job.jobId should not be empty job.algorithmDefinition.dockerImage should startWith("hbpmip/python-knn") @@ -231,7 +229,7 @@ class QueryToJobServiceTest List("score_test1"), List("stress_before_test1"), List(), - table, + sampleDataTableId, scoreStressTest1Filter, None, None @@ -243,7 +241,7 @@ class QueryToJobServiceTest 'metadata (List(SampleVariables.score_test1, SampleVariables.stress_before_test1)) ) - job.query.sql shouldBe """SELECT "score_test1","stress_before_test1" FROM "Sample" WHERE "score_test1" IS NOT NULL AND "stress_before_test1" IS NOT NULL""" + job.query.sql shouldBe """SELECT "score_test1","stress_before_test1" FROM "sample_data" WHERE "score_test1" IS NOT NULL AND "stress_before_test1" IS NOT NULL""" feedback shouldBe List(UserInfo("Missing variables stress_before_test1")) } @@ -259,7 +257,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithm = AlgorithmSpec("knn", List(CodeValue("k", "5")), None), datasets = TreeSet(DatasetId("desd-synthdata")), executionPlan = None @@ -272,7 +270,6 @@ class QueryToJobServiceTest val job: DockerJob = maybeJob.value.job.asInstanceOf[MiningJob].dockerJob val feedback = maybeJob.value.feedback - val table = TableId("features_db", None, "cde_features_a") job.jobId should not be empty job.algorithmDefinition.dockerImage should startWith("hbpmip/python-knn") @@ -282,7 +279,7 @@ class QueryToJobServiceTest List("apoe4"), List("lefthippocampus"), List(), - table, + cdeFeaturesATableId, apoe4LeftHippDesdFilter, None, None @@ -307,7 +304,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithm = AlgorithmSpec("knn", List(CodeValue("k", "5")), None), datasets = TreeSet(DatasetId("desd-synthdata")), executionPlan = None @@ -320,7 +317,6 @@ class QueryToJobServiceTest val job: DockerJob = maybeJob.value.job.asInstanceOf[MiningJob].dockerJob val feedback = maybeJob.value.feedback - val table = TableId("features_db", None, "cde_features_a") job.jobId should not be empty job.algorithmDefinition.dockerImage should startWith("hbpmip/python-knn") @@ -330,7 +326,7 @@ class QueryToJobServiceTest List("apoe4"), List("lefthippocampus"), List(), - table, + cdeFeaturesATableId, apoe4LeftHippDesdFilter, None, None @@ -355,12 +351,11 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithm = AlgorithmSpec(ValidationJob.algorithmCode, Nil, None), datasets = TreeSet(DatasetId("desd-synthdata")), executionPlan = None ) - val table = TableId("features_db", None, "cde_features_a") val maybeJob = queryToJobService.miningQuery2Job(query).unsafeRunSync() @@ -391,7 +386,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("unknown"), + targetTable = Some(unknownTableId), algorithms = List(AlgorithmSpec("knn", List(CodeValue("k", "5")), None)), trainingDatasets = TreeSet(), testingDatasets = TreeSet(), @@ -404,7 +399,7 @@ class QueryToJobServiceTest queryToJobService.experimentQuery2Job(query).unsafeRunSync() maybeJob shouldBe invalid - maybeJob.invalidValue.head shouldBe "Cannot find metadata for table unknown" + maybeJob.invalidValue.head shouldBe "Cannot find metadata for table unknown.public.unknown" } "fail when the algorithm is unknown" in { @@ -415,7 +410,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec("unknown", Nil, None)), trainingDatasets = TreeSet(), testingDatasets = TreeSet(), @@ -440,7 +435,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec("knn", Nil, None)), trainingDatasets = TreeSet(), testingDatasets = TreeSet(), @@ -465,7 +460,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec("knn", Nil, None)), trainingDatasets = TreeSet(), testingDatasets = TreeSet(), @@ -490,7 +485,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec("knn", List(CodeValue("k", "5")), None)), trainingDatasets = TreeSet(DatasetId("desd-synthdata")), testingDatasets = TreeSet(), @@ -498,7 +493,6 @@ class QueryToJobServiceTest validations = Nil, executionPlan = None ) - val table = TableId("features_db", None, "cde_features_a") val maybeJob = queryToJobService.experimentQuery2Job(query).unsafeRunSync() @@ -509,12 +503,10 @@ class QueryToJobServiceTest job.jobId should not be empty job should have( - 'inputTable (table), + 'inputTable (cdeFeaturesATableId), 'query (query.copy(filters = apoe4LeftHippDesdFilter)), 'metadata (List(CdeVariables.apoe4, CdeVariables.leftHipocampus)) ) - // ExperimentQuery(UserId(test),List(VariableId(apoe4)),List(VariableId(lefthippocampus)),true,List(),None,Some(cde_features_a),Set(DatasetId(desd-synthdata)),Set(),List(AlgorithmSpec(knn,List(CodeValue(k,5)),None)),Set(),List(),None), instead of its expected value - // ExperimentQuery(UserId(test),List(VariableId(apoe4)),List(VariableId(lefthippocampus)),true,List(),Some(CompoundFilterRule(AND,List(SingleFilterRule(apoe4,apoe4,string,text,is_not_null,List()), SingleFilterRule(lefthippocampus,lefthippocampus,string,text,is_not_null,List()), SingleFilterRule(dataset,dataset,string,text,in,List(desd-synthdata))))),Some(cde_features_a),Set(DatasetId(desd-synthdata)),Set(),List(AlgorithmSpec(knn,List(CodeValue(k,5)),None)),Set(),List(),None), on object ExperimentJob(06a5f472-f836-4fc9-b9d5-13a1ef49d100,TableId(features_db,None,cde_features_a),ExperimentQuery(UserId(test),List(VariableId(apoe4)),List(VariableId(lefthippocampus)),true,List(),None,Some(cde_features_a),Set(DatasetId(desd-synthdata)),Set(),List(AlgorithmSpec(knn,List(CodeValue(k,5)),None)),Set(),List(),None),Map(AlgorithmSpec(knn,List(CodeValue(k,5)),None) -> AlgorithmDefinition(knn,hbpmip/python-knn:0.4.0,true,false,false,Docker,ExecutionPlan(List(ExecutionStep(map,map,SelectDataset(training),Compute(compute-local)), ExecutionStep(reduce,reduce,PreviousResults(map),Compute(compute-global)))))),List(VariableMetaData(apoe4,ApoE4,polynominal,Some(int),Some(adni-merge),Some(Apolipoprotein E (APOE) e4 allele: is the strongest risk factor for Late Onset Alzheimer Disease (LOAD). At least one copy of APOE-e4 ),None,Some(List(EnumeratedValue(0,0), EnumeratedValue(1,1), EnumeratedValue(2,2))),None,None,None,None,Set()), VariableMetaData(lefthippocampus,Left Hippocampus,real,None,Some(lren-nmm-volumes),Some(),Some(cm3),None,None,None,None,None,Set()))) } "drop the unknown covariables that do not need to exist" in { @@ -525,7 +517,7 @@ class QueryToJobServiceTest covariablesMustExist = false, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec("knn", List(CodeValue("k", "5")), None)), trainingDatasets = TreeSet(DatasetId("desd-synthdata")), testingDatasets = TreeSet(), @@ -533,7 +525,6 @@ class QueryToJobServiceTest validations = Nil, executionPlan = None ) - val table = TableId("features_db", None, "cde_features_a") val maybeJob = queryToJobService.experimentQuery2Job(query).unsafeRunSync() @@ -544,7 +535,7 @@ class QueryToJobServiceTest job.jobId should not be empty job should have( - 'inputTable (table), + 'inputTable (cdeFeaturesATableId), 'query ( query.copy(covariables = List(VariableId("lefthippocampus")), filters = apoe4LeftHippDesdFilter) @@ -561,7 +552,7 @@ class QueryToJobServiceTest covariablesMustExist = true, grouping = Nil, filters = None, - targetTable = Some("cde_features_a"), + targetTable = Some(cdeFeaturesATableId), algorithms = List(AlgorithmSpec(ValidationJob.algorithmCode, Nil, None)), trainingDatasets = TreeSet(DatasetId("desd-synthdata")), testingDatasets = TreeSet(), diff --git a/src/test/scala/ch/chuv/lren/woken/service/TestServices.scala b/src/test/scala/ch/chuv/lren/woken/service/TestServices.scala index 8692d591..5bae69b1 100644 --- a/src/test/scala/ch/chuv/lren/woken/service/TestServices.scala +++ b/src/test/scala/ch/chuv/lren/woken/service/TestServices.scala @@ -24,12 +24,14 @@ import ch.chuv.lren.woken.backends.faas.AlgorithmExecutor import ch.chuv.lren.woken.backends.worker.WokenWorker import ch.chuv.lren.woken.config.WokenConfiguration import ch.chuv.lren.woken.core.model.VariablesMeta -import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableId } +import ch.chuv.lren.woken.core.model.database.FeaturesTableDescription import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers import ch.chuv.lren.woken.dao._ import ch.chuv.lren.woken.errors.ErrorReporter +import ch.chuv.lren.woken.messages.datasets.TableId import ch.chuv.lren.woken.messages.variables.{ GroupMetaData, VariableId } import ch.chuv.lren.woken.messages.variables.variablesProtocol._ +import ch.chuv.lren.woken.config.ConfigurationInstances._ import org.scalamock.scalatest.MockFactory import spray.json.JsObject @@ -51,24 +53,24 @@ object TestServices extends JsonUtils with FeaturesTableTestSupport with MockFac 1, "churn", churnHierarchy, - "CHURN", + churnDataTableId, List("state", "custserv_calls", "churn").map(VariableId) ) val sampleHierarchy = loadJson("/metadata/sample_variables.json").convertTo[GroupMetaData] - val sampleVariablesMeta = VariablesMeta(2, "sample", sampleHierarchy, "Sample", Nil) + val sampleVariablesMeta = VariablesMeta(2, "sample", sampleHierarchy, sampleDataTableId, Nil) val cdeHierarchy = loadJson("/metadata/mip_cde_variables.json").convertTo[GroupMetaData] val cdeGroupings = List("dataset", "gender", "agegroup", "alzheimerbroadcategory").map(VariableId) val featuresAVariablesMeta = - VariablesMeta(3, "cde_features_a", cdeHierarchy, "CDE_FEATURES_A", cdeGroupings) + VariablesMeta(3, "cde_features_a", cdeHierarchy, cdeFeaturesATableId, cdeGroupings) val featuresBVariablesMeta = - VariablesMeta(4, "cde_features_b", cdeHierarchy, "CDE_FEATURES_B", cdeGroupings) + VariablesMeta(4, "cde_features_b", cdeHierarchy, cdeFeaturesBTableId, cdeGroupings) val featuresCVariablesMeta = - VariablesMeta(5, "cde_features_c", cdeHierarchy, "CDE_FEATURES_C", cdeGroupings) + VariablesMeta(5, "cde_features_c", cdeHierarchy, cdeFeaturesCTableId, cdeGroupings) val featuresMixedVariablesMeta = - VariablesMeta(6, "cde_features_mixed", cdeHierarchy, "CDE_FEATURES_MIXED", cdeGroupings) + VariablesMeta(6, "cde_features_mixed", cdeHierarchy, cdeFeaturesMixedTableId, cdeGroupings) val metaService = new MetadataInMemoryRepository[IO]().variablesMeta @@ -84,20 +86,23 @@ object TestServices extends JsonUtils with FeaturesTableTestSupport with MockFac lazy val algorithmLibraryService: AlgorithmLibraryService = AlgorithmLibraryService() - val tables: Set[FeaturesTableDescription] = Set(sampleTable, cdeTable) + val tables: Map[TableId, FeaturesTableDescription] = + Set(sampleTable, cdeTable).map(t => (t.table, t)).toMap val tablesContent: Map[TableId, (Headers, List[JsObject])] = Map( sampleTable.table -> (sampleHeaders -> sampleData), cdeTable.table -> (cdeHeaders -> cdeData) ) lazy val featuresService: FeaturesService[IO] = FeaturesService( - new FeaturesInMemoryRepository[IO](database, tables, tablesContent) + new FeaturesInMemoryRepository[IO](featuresDbConfiguration, tablesContent) ) - val featuresTableId = TableId("features_db", None, "Sample") + val emptyFeaturesTableId = TableId(featuresDb.code, "empty_table") + val emptyFeatureTable = + FeaturesTableDescription(emptyFeaturesTableId, Nil, None, validateSchema = false, None, 0.67) lazy val emptyFeaturesTableService: FeaturesTableService[IO] = new FeaturesTableServiceImpl( - new FeaturesTableInMemoryRepository[IO](featuresTableId, Nil, None, Nil) + new FeaturesTableInMemoryRepository[IO](emptyFeatureTable, Nil, None, Nil) ) implicit val ec: ExecutionContext = ExecutionContext.global @@ -105,7 +110,7 @@ object TestServices extends JsonUtils with FeaturesTableTestSupport with MockFac implicit lazy val defaultTimer: Timer[IO] = cats.effect.IO.timer(ec) def databaseServices(config: WokenConfiguration): DatabaseServices[IO] = { - val datasetService: DatasetService = ConfBasedDatasetService(config.config) + val datasetService: DatasetService = ConfBasedDatasetService(config.config, config.jobs) val queryToJobService = QueryToJobService(featuresService, localVariablesMetaService, config.jobs,