From 916b9a4b774475ba6ccaf6497c8ec6557c6ffebd Mon Sep 17 00:00:00 2001 From: Ludovic Claude Date: Tue, 5 Feb 2019 22:31:48 +0100 Subject: [PATCH] Complete implementation of ExtendedFeaturesTableInMemoryRepository --- .../woken/api/MaintenanceWebService.scala | 1 + .../lren/woken/backends/WebSocketClient.scala | 2 +- .../lren/woken/dao/FeaturesRepository.scala | 42 +++++++++++++++---- .../woken/dao/FeaturesRepositoryDAO.scala | 2 +- .../faas/AlgorithmExecutorInstances.scala | 5 ++- tests/docker-compose.yml | 5 ++- 6 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/main/scala/ch/chuv/lren/woken/api/MaintenanceWebService.scala b/src/main/scala/ch/chuv/lren/woken/api/MaintenanceWebService.scala index defd761b..7e1c1255 100644 --- a/src/main/scala/ch/chuv/lren/woken/api/MaintenanceWebService.scala +++ b/src/main/scala/ch/chuv/lren/woken/api/MaintenanceWebService.scala @@ -54,6 +54,7 @@ class MaintenanceWebService[F[_]: Effect]( val routes: Route = prefillMiningCache ~ maintainMiningCache ~ resetMiningCache + // TODO: prefill should return a websocket that tracks progress override def prefillMiningCache: Route = securePath( "maintenance" / "mining-cache" / "prefill", post { diff --git a/src/main/scala/ch/chuv/lren/woken/backends/WebSocketClient.scala b/src/main/scala/ch/chuv/lren/woken/backends/WebSocketClient.scala index 036ad74a..695c1602 100644 --- a/src/main/scala/ch/chuv/lren/woken/backends/WebSocketClient.scala +++ b/src/main/scala/ch/chuv/lren/woken/backends/WebSocketClient.scala @@ -80,7 +80,7 @@ object WebSocketClient promise.failure(e) throw e } - .mapAsync(3)(identity) + .mapAsync(parallelism = 3)(identity) .map { jsonEncodedString => Try { jsonEncodedString.parseJson.convertTo[QueryResult] diff --git a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala index 1c7679e1..11b4dd7b 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala @@ -18,9 +18,11 @@ package ch.chuv.lren.woken.dao import cats.Id +import cats.effect.concurrent.Ref import cats.effect.{ Effect, Resource } import cats.implicits._ import ch.chuv.lren.woken.core.features.FeaturesQuery +import ch.chuv.lren.woken.core.fp.runNow import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId } import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers @@ -28,6 +30,7 @@ import ch.chuv.lren.woken.messages.datasets.DatasetId import spray.json._ import spray.json.DefaultJsonProtocol._ import ch.chuv.lren.woken.messages.query.filters._ +import ch.chuv.lren.woken.messages.variables.SqlType import doobie.util.log.LogHandler import doobie.util.update.Update0 import sup.HealthCheck @@ -251,6 +254,8 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId, override def features(query: FeaturesQuery): F[(Headers, Stream[JsObject])] = (columns, dataFeatures.toStream).pure[F] + private val genNumber: F[Ref[F, Int]] = Ref.of[F, Int](0) + override def createExtendedFeaturesTable( filters: Option[FilterRule], newFeatures: List[TableColumn], @@ -263,8 +268,11 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId, filters, newFeatures, otherColumns, - prefills, - ??? + prefills, { () => + genNumber.flatMap { g => + g.modify(x => (x + 1, x)) + } + } ) .map(r => r.map[FeaturesTableRepository[F]](t => t)) @@ -282,22 +290,37 @@ object ExtendedFeaturesTableInMemoryRepository { prefills: List[PrefillExtendedFeaturesTable], nextTableSeqNumber: () => F[Int] ): Validation[Resource[F, ExtendedFeaturesTableInMemoryRepository[F]]] = { - val respository = new ExtendedFeaturesTableInMemoryRepository[F]( + val pk = sourceTable.table.primaryKey.headOption + .getOrElse(throw new Exception("Expected a single primary key")) + val rndColumn = TableColumn("_rnd", SqlType.numeric) + val newColumns = newFeatures ++ otherColumns + val extTableColumns = newFeatures ++ List(rndColumn) + val extViewColumns = newColumns ++ extTableColumns.filter(_ != pk) + val extendedTableNumber = runNow(nextTableSeqNumber()) + val table = sourceTable.table + val extTable = table.copy( + table = table.table.copy(name = s"${table.table.name}__$extendedTableNumber"), + validateSchema = false + ) + val extView = extTable.copy(table = extTable.table.copy(name = extTable.table.name + "v")) + val repository = new ExtendedFeaturesTableInMemoryRepository[F]( sourceTable, - ???, - ???, - ???, + filters, + extView, + extViewColumns, + extTable, newFeatures, - ??? + rndColumn ) - Resource.make(Effect[F].delay(respository))(_ => Effect[F].delay(())).valid + Resource.make(Effect[F].delay(repository))(_ => Effect[F].delay(())).valid } } class ExtendedFeaturesTableInMemoryRepository[F[_]: Effect] private ( val sourceTable: FeaturesTableInMemoryRepository[F], + val filters: Option[FilterRule], val view: FeaturesTableDescription, val viewColumns: List[TableColumn], val extTable: FeaturesTableDescription, @@ -376,5 +399,6 @@ class ExtendedFeaturesTableInMemoryRepository[F[_]: Effect] private ( * @param filters The filters used to filter rows * @return a set of dataset ids */ - override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] = ??? + override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] = + sourceTable.datasets(filters) } diff --git a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala index fe84df95..382b9d06 100644 --- a/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala +++ b/src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepositoryDAO.scala @@ -496,7 +496,7 @@ object ExtendedFeaturesTableRepositoryDAO { stmt.update.run } - val extTableColumns = newFeatures ++ List(rndColumn: TableColumn) + val extTableColumns = newFeatures ++ List(rndColumn) val extViewDescription = extTable.copy(table = table.table.copy(name = s"${extTable.table.name}v")) val extViewColumns = tableColumns ++ extTableColumns.filter(_ != pk) diff --git a/src/test/scala/ch/chuv/lren/woken/backends/faas/AlgorithmExecutorInstances.scala b/src/test/scala/ch/chuv/lren/woken/backends/faas/AlgorithmExecutorInstances.scala index ef8f6b80..5a26676b 100644 --- a/src/test/scala/ch/chuv/lren/woken/backends/faas/AlgorithmExecutorInstances.scala +++ b/src/test/scala/ch/chuv/lren/woken/backends/faas/AlgorithmExecutorInstances.scala @@ -91,7 +91,10 @@ object AlgorithmExecutorInstances { ) ).pure[IO] - override def healthCheck: HealthCheck[IO, TaggedS] = ??? + override def healthCheck: HealthCheck[IO, TaggedS] = + HealthCheck + .const[IO, Id](Health.healthy) + .through[IO, TaggedS](mods.tagWith("Algorithm executor (Dummy)")) } private def errorResponse(job: DockerJob, msg: String) = diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 284b0b78..19fc71eb 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -221,9 +221,12 @@ services: volumes: - ./woken/config:/opt/woken/config ports: + # Web API - "8087:8087" + # Akka cluster - "8088:8088" - "9095:9095" + # Remote Java debugging with JDWP - "8000:8000" environment: CLUSTER_PORT: 8088 @@ -239,7 +242,7 @@ services: RELEASE_STAGE: "dev" DATACENTER_LOCATION: "$HOST" CONTAINER_ORCHESTRATION: "docker-compose" - JAVA_OPTS: "-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=n" + JAVA_OPTIONS: "-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=n" links: - zipkin - wokenprometheus