Skip to content

Commit

Permalink
Merge branch 'error-handling' of https://github.com/devsprint/woken i…
Browse files Browse the repository at this point in the history
…nto devsprint-error-handling
  • Loading branch information
ludovicc committed Feb 5, 2019
2 parents 926c20b + e2d9cc1 commit 03593a8
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 35 deletions.
2 changes: 2 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ services:
mesos_master:
image: axelspringer/mesos-master:1.4.1
hostname: mesosmaster
ports:
- '5050:5050'
volumes:
- ./log/mesos/master:/var/log/mesos/master:rw
- ./tmp/mesos/master:/var/tmp/mesos:rw
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("io.kamon" % "sbt-aspectj-runner" % "1.1.1")

addSbtPlugin("org.ensime" % "sbt-ensime" % "2.5.1")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object WebSocketClient
Flow
.fromSinkAndSourceMat(sink, source)(Keep.left)

val (upgradeResponse, closed) =
val (upgradeResponse, _) =
Http().singleWebSocketRequest(
WebSocketRequest(
location.url,
Expand All @@ -111,8 +111,6 @@ object WebSocketClient
flow
)

import actorSystem.dispatcher

val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
Expand Down
133 changes: 125 additions & 8 deletions src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package ch.chuv.lren.woken.dao

import cats.{ Applicative, Id }
import cats.effect.Resource
import cats.Id
import cats.effect.{ Effect, Resource }
import cats.implicits._
import ch.chuv.lren.woken.core.features.FeaturesQuery
import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableColumn, TableId }
Expand Down Expand Up @@ -155,7 +155,7 @@ object FeaturesTableRepository {

}

class FeaturesInMemoryRepository[F[_]: Applicative](
class FeaturesInMemoryRepository[F[_]: Effect](
override val database: String,
override val tables: Set[FeaturesTableDescription],
val tablesContent: Map[TableId, (Headers, List[JsObject])]
Expand All @@ -179,10 +179,10 @@ class FeaturesInMemoryRepository[F[_]: Applicative](

}

class FeaturesTableInMemoryRepository[F[_]: Applicative](val tableId: TableId,
override val columns: List[TableColumn],
val datasetColumn: Option[TableColumn],
val dataFeatures: List[JsObject])
class FeaturesTableInMemoryRepository[F[_]: Effect](val tableId: TableId,
override val columns: List[TableColumn],
val datasetColumn: Option[TableColumn],
val dataFeatures: List[JsObject])
extends FeaturesTableRepository[F] {

import FeaturesTableRepository.Headers
Expand Down Expand Up @@ -256,8 +256,125 @@ class FeaturesTableInMemoryRepository[F[_]: Applicative](val tableId: TableId,
newFeatures: List[TableColumn],
otherColumns: List[TableColumn],
prefills: List[PrefillExtendedFeaturesTable]
): Validation[Resource[F, FeaturesTableRepository[F]]] = "not implemented".invalidNel
): Validation[Resource[F, FeaturesTableRepository[F]]] =
ExtendedFeaturesTableInMemoryRepository
.apply(
this,
filters,
newFeatures,
otherColumns,
prefills,
???
)
.map(r => r.map[FeaturesTableRepository[F]](t => t))

override def healthCheck: HealthCheck[F, Id] = HealthCheck.liftFBoolean(true.pure[F])

}

object ExtendedFeaturesTableInMemoryRepository {

def apply[F[_]: Effect](
sourceTable: FeaturesTableInMemoryRepository[F],
filters: Option[FilterRule],
newFeatures: List[TableColumn],
otherColumns: List[TableColumn],
prefills: List[PrefillExtendedFeaturesTable],
nextTableSeqNumber: () => F[Int]
): Validation[Resource[F, ExtendedFeaturesTableInMemoryRepository[F]]] = {
val respository = new ExtendedFeaturesTableInMemoryRepository[F](
sourceTable,
???,
???,
???,
newFeatures,
???
)

Resource.make(Effect[F].delay(respository))(_ => Effect[F].delay(())).valid
}

}

class ExtendedFeaturesTableInMemoryRepository[F[_]: Effect] private (
val sourceTable: FeaturesTableInMemoryRepository[F],
val view: FeaturesTableDescription,
val viewColumns: List[TableColumn],
val extTable: FeaturesTableDescription,
val newFeatures: List[TableColumn],
val rndColumn: TableColumn
) extends FeaturesTableRepository[F] {

/**
* Description of the table
*
* @return the description
*/
override def table: FeaturesTableDescription = view

/**
* Total number of rows in the table
*
* @return number of rows
*/
override def count: F[Int] = sourceTable.count

/**
* Number of rows belonging to the dataset.
*
* @param dataset The dataset used to filter rows
* @return the number of rows in the dataset, 0 if dataset is not associated with the table
*/
override def count(dataset: DatasetId): F[Int] = sourceTable.count(dataset)

/**
* Number of rows matching the filters.
*
* @param filters The filters used to filter rows
* @return the number of rows in the dataset matching the filters, or the total number of rows if there are no filters
*/
override def count(filters: Option[FilterRule]): F[Int] = sourceTable.count(filters)

/**
* Number of rows grouped by a reference column
*
* @return a map containing the number of rows for each value of the group by column
*/
override def countGroupBy(groupByColumn: TableColumn,
filters: Option[FilterRule]): F[Map[String, Int]] =
sourceTable.countGroupBy(groupByColumn, filters)

/**
* @return all headers of the table
*/
override def columns: Headers = viewColumns

override def features(query: FeaturesQuery): F[(Headers, Stream[JsObject])] =
sourceTable.features(query)

/**
*
* @param filters Filters always applied on the queries
* @param newFeatures New features to create, can be used in machine learning tasks
* @param otherColumns Other columns to create, present to support some internal functionalities like cross-validation
* @param prefills List of generators for update statements used to pre-fill the extended features table with data.
* @return
*/
override def createExtendedFeaturesTable(
filters: Option[FilterRule],
newFeatures: List[TableColumn],
otherColumns: List[TableColumn],
prefills: List[PrefillExtendedFeaturesTable]
): Validation[Resource[F, FeaturesTableRepository[F]]] =
"Impossible to extend an extended table".invalidNel

override def healthCheck: HealthCheck[F, Id] = HealthCheck.liftFBoolean(true.pure[F])

/**
* Returns the list of datasets effectively used by a query
*
* @param filters The filters used to filter rows
* @return a set of dataset ids
*/
override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ case class CrossValidationFlow[F[_]: Effect](
l :+ r
}
.mapAsync(1) { foldResults =>
if (foldResults.isEmpty) throw new Exception("No fold results received")
runLater(scoreAll(foldResults.sortBy(_.fold).toNel))
}
.map { jobScoreOption =>
Expand All @@ -139,6 +138,9 @@ case class CrossValidationFlow[F[_]: Effect](
}
)
)
case Right(kFoldCrossValidationScore: KFoldCrossValidationScore) =>
crossValidationScore.job -> Right(kFoldCrossValidationScore)

case Left(error) =>
logger.warn(s"Global score failed with message $error")
crossValidationScore.job -> Left(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ object AlgorithmExecutorInstances {
HealthCheck.const[IO, Id](Health.healthy).through(mods.tagWith("Health of mock executor"))
}

def dummyAlgorithm: AlgorithmExecutor[IO] =
new AlgorithmExecutor[IO] {

/**
* Name of the current node (or cluster) where Docker containers are executed
*/
override def node: String = "DummyTestNode"

private val pfa =
"""
{
"input": {},
"output": {},
"action": [],
"cells": {}
}
""".stripMargin.parseJson.asJsObject

override def execute(job: DockerJob): IO[AlgorithmResults] =
AlgorithmResults(
job,
List(
PfaJobResult(job.jobId, "testNode", OffsetDateTime.now(), job.algorithmSpec.code, pfa)
)
).pure[IO]

override def healthCheck: HealthCheck[IO, TaggedS] = ???
}

private def errorResponse(job: DockerJob, msg: String) =
AlgorithmResults(
job,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2017 LREN CHUV for Human Brain Project
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package ch.chuv.lren.woken.mining

import java.util.UUID

import cats.implicits._
import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableId }
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.messages.query._
import ch.chuv.lren.woken.messages.variables.VariableId
import ch.chuv.lren.woken.Predefined.Algorithms.{
anovaDefinition,
anovaFactorial,
knnDefinition,
knnWithK5
}
import ch.chuv.lren.woken.core.model.jobs.ExperimentJob

object ExperimentQuerySupport {

def experimentQuery(algorithm: String, parameters: List[CodeValue]) =
ExperimentQuery(
user = UserId("test1"),
variables = List(VariableId("cognitive_task2")),
covariables = List(VariableId("score_test1"), VariableId("college_math")),
covariablesMustExist = false,
grouping = Nil,
filters = None,
targetTable = Some("Sample"),
algorithms = List(AlgorithmSpec(algorithm, parameters, None)),
validations = List(ValidationSpec("kfold", List(CodeValue("k", "2")))),
trainingDatasets = Set(),
testingDatasets = Set(),
validationDatasets = Set(),
executionPlan = None
)

def experimentQuery(algorithms: List[AlgorithmSpec]) =
ExperimentQuery(
user = UserId("test1"),
variables = List(VariableId("cognitive_task2")),
covariables = List(VariableId("score_test1"), VariableId("college_math")),
covariablesMustExist = false,
grouping = Nil,
filters = None,
targetTable = Some("Sample"),
algorithms = algorithms,
validations = List(ValidationSpec("kfold", List(CodeValue("k", "2")))),
trainingDatasets = Set(),
testingDatasets = Set(),
validationDatasets = Set(),
executionPlan = None
)

def experimentQuery2job(query: ExperimentQuery): Validation[ExperimentJob] =
ExperimentJob.mkValid(
UUID.randomUUID().toString,
query,
FeaturesTableDescription(TableId("features_db", None, query.targetTable.getOrElse("Sample")),
Nil,
None,
validateSchema = false,
None,
0.67),
Nil, { spec =>
Map(knnWithK5 -> knnDefinition, anovaFactorial -> anovaDefinition)
.get(spec)
.toRight("Missing algorithm")
.toValidatedNel[String]
}
)

}
Loading

0 comments on commit 03593a8

Please sign in to comment.