Skip to content

Commit

Permalink
Complete implementation of ExtendedFeaturesTableInMemoryRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovicc committed Feb 5, 2019
1 parent 03593a8 commit 916b9a4
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class MaintenanceWebService[F[_]: Effect](

val routes: Route = prefillMiningCache ~ maintainMiningCache ~ resetMiningCache

// TODO: prefill should return a websocket that tracks progress
override def prefillMiningCache: Route = securePath(
"maintenance" / "mining-cache" / "prefill",
post {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object WebSocketClient
promise.failure(e)
throw e
}
.mapAsync(3)(identity)
.mapAsync(parallelism = 3)(identity)
.map { jsonEncodedString =>
Try {
jsonEncodedString.parseJson.convertTo[QueryResult]
Expand Down
42 changes: 33 additions & 9 deletions src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package ch.chuv.lren.woken.dao

import cats.Id
import cats.effect.concurrent.Ref
import cats.effect.{ Effect, Resource }
import cats.implicits._
import ch.chuv.lren.woken.core.features.FeaturesQuery
import ch.chuv.lren.woken.core.fp.runNow
import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId }
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers
import ch.chuv.lren.woken.messages.datasets.DatasetId
import spray.json._
import spray.json.DefaultJsonProtocol._
import ch.chuv.lren.woken.messages.query.filters._
import ch.chuv.lren.woken.messages.variables.SqlType
import doobie.util.log.LogHandler
import doobie.util.update.Update0
import sup.HealthCheck
Expand Down Expand Up @@ -251,6 +254,8 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId,
override def features(query: FeaturesQuery): F[(Headers, Stream[JsObject])] =
(columns, dataFeatures.toStream).pure[F]

private val genNumber: F[Ref[F, Int]] = Ref.of[F, Int](0)

override def createExtendedFeaturesTable(
filters: Option[FilterRule],
newFeatures: List[TableColumn],
Expand All @@ -263,8 +268,11 @@ class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId,
filters,
newFeatures,
otherColumns,
prefills,
???
prefills, { () =>
genNumber.flatMap { g =>
g.modify(x => (x + 1, x))
}
}
)
.map(r => r.map[FeaturesTableRepository[F]](t => t))

Expand All @@ -282,22 +290,37 @@ object ExtendedFeaturesTableInMemoryRepository {
prefills: List[PrefillExtendedFeaturesTable],
nextTableSeqNumber: () => F[Int]
): Validation[Resource[F, ExtendedFeaturesTableInMemoryRepository[F]]] = {
val respository = new ExtendedFeaturesTableInMemoryRepository[F](
val pk = sourceTable.table.primaryKey.headOption
.getOrElse(throw new Exception("Expected a single primary key"))
val rndColumn = TableColumn("_rnd", SqlType.numeric)
val newColumns = newFeatures ++ otherColumns
val extTableColumns = newFeatures ++ List(rndColumn)
val extViewColumns = newColumns ++ extTableColumns.filter(_ != pk)
val extendedTableNumber = runNow(nextTableSeqNumber())
val table = sourceTable.table
val extTable = table.copy(
table = table.table.copy(name = s"${table.table.name}__$extendedTableNumber"),
validateSchema = false
)
val extView = extTable.copy(table = extTable.table.copy(name = extTable.table.name + "v"))
val repository = new ExtendedFeaturesTableInMemoryRepository[F](
sourceTable,
???,
???,
???,
filters,
extView,
extViewColumns,
extTable,
newFeatures,
???
rndColumn
)

Resource.make(Effect[F].delay(respository))(_ => Effect[F].delay(())).valid
Resource.make(Effect[F].delay(repository))(_ => Effect[F].delay(())).valid
}

}

class ExtendedFeaturesTableInMemoryRepository[F[_]: Effect] private (
val sourceTable: FeaturesTableInMemoryRepository[F],
val filters: Option[FilterRule],
val view: FeaturesTableDescription,
val viewColumns: List[TableColumn],
val extTable: FeaturesTableDescription,
Expand Down Expand Up @@ -376,5 +399,6 @@ class ExtendedFeaturesTableInMemoryRepository[F[_]: Effect] private (
* @param filters The filters used to filter rows
* @return a set of dataset ids
*/
override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] = ???
override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] =
sourceTable.datasets(filters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ object ExtendedFeaturesTableRepositoryDAO {
stmt.update.run
}

val extTableColumns = newFeatures ++ List(rndColumn: TableColumn)
val extTableColumns = newFeatures ++ List(rndColumn)
val extViewDescription =
extTable.copy(table = table.table.copy(name = s"${extTable.table.name}v"))
val extViewColumns = tableColumns ++ extTableColumns.filter(_ != pk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ object AlgorithmExecutorInstances {
)
).pure[IO]

override def healthCheck: HealthCheck[IO, TaggedS] = ???
override def healthCheck: HealthCheck[IO, TaggedS] =
HealthCheck
.const[IO, Id](Health.healthy)
.through[IO, TaggedS](mods.tagWith("Algorithm executor (Dummy)"))
}

private def errorResponse(job: DockerJob, msg: String) =
Expand Down
5 changes: 4 additions & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,12 @@ services:
volumes:
- ./woken/config:/opt/woken/config
ports:
# Web API
- "8087:8087"
# Akka cluster
- "8088:8088"
- "9095:9095"
# Remote Java debugging with JDWP
- "8000:8000"
environment:
CLUSTER_PORT: 8088
Expand All @@ -239,7 +242,7 @@ services:
RELEASE_STAGE: "dev"
DATACENTER_LOCATION: "$HOST"
CONTAINER_ORCHESTRATION: "docker-compose"
JAVA_OPTS: "-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=n"
JAVA_OPTIONS: "-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=n"
links:
- zipkin
- wokenprometheus
Expand Down

0 comments on commit 916b9a4

Please sign in to comment.