Skip to content

Commit

Permalink
Config runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ningyougang committed Mar 2, 2021
1 parent d8cf172 commit a7be1d4
Show file tree
Hide file tree
Showing 20 changed files with 505 additions and 44 deletions.
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default('controller.user') }}"
password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down Expand Up @@ -209,6 +211,8 @@ invoker:
{% endif %}"
extraEnv: "{{ invoker_extraEnv | default({}) }}"
protocol: "{{ invoker_protocol | default('https') }}"
username: "{{ invoker_username | default('invoker.user') }}"
password: "{{ invoker_password | default('invoker.pass') }}"
ssl:
cn: "openwhisk-invokers"
keyPrefix: "{{ __invoker_ssl_keyPrefix }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"

"CONFIG_whisk_controller_username": "{{ controller.username }}"
"CONFIG_whisk_controller_password": "{{ controller.password }}"

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,9 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val whiskClusterName = "whisk.cluster.name"

val whiskControllerUsername = "whisk.controller.username"
val whiskControllerPassword = "whisk.controller.password"
val whiskInvokerUsername = "whisk.invoker.username"
val whiskInvokerPassword = "whisk.invoker.password"
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,30 @@ object StatusData extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}

case class RuntimeMessage(runtime: String) extends Message {
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
}

object RuntimeMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
}

case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
}

object PrewarmContainerData extends DefaultJsonProtocol {
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
}

case class PrewarmContainerDataList(items: List[PrewarmContainerData])

object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
def write(f: PrewarmContainerDataList) = ???
}
}
4 changes: 4 additions & 0 deletions core/controller/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ whisk{
file-system : true
dir-path : "/swagger-ui/"
}
controller {
username: "controller.user"
password: "controller.pass"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand All @@ -32,7 +33,7 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
Expand Down Expand Up @@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
}

// initialize datastores
Expand Down Expand Up @@ -176,6 +177,59 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))

private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)

/**
* config runtime
*/
private val configRuntime = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "runtime") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerUsername && password == controllerPassword) {
entity(as[String]) { runtime =>
val execManifest = ExecManifest.initialize(whiskConfig, Some(runtime))
if (execManifest.isFailure) {
logging.info(this, s"received invalid runtimes manifest")
complete(StatusCodes.BadRequest)
} else {
parameter('limit.?) { limit =>
limit match {
case Some(targetValue) =>
val pattern = """\d+:\d"""
if (targetValue.matches(pattern)) {
val invokerArray = targetValue.split(":")
val beginIndex = invokerArray(0).toInt
val finishIndex = invokerArray(1).toInt
if (finishIndex < beginIndex) {
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
} else {
val targetInvokers = (beginIndex to finishIndex).toList
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
logging.info(this, "config runtime request is already sent to target invokers")
complete(StatusCodes.Accepted)
}
} else {
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
}
case None =>
loadBalancer.sendRuntimeToInvokers(runtime, None)
logging.info(this, "config runtime request is already sent to all managed invokers")
complete(StatusCodes.Accepted)
}
}
}
}
} else {
complete(StatusCodes.Unauthorized, "username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send runtime to invokers
*
* @param runtime
* @param targetInvokers
*/
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
Expand Down Expand Up @@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
}
}

/** send runtime to invokers*/
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
val runtimeMessage = RuntimeMessage(runtime)
schedulingState.managedInvokers.filter { manageInvoker =>
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, runtimeMessage).andThen {
case Success(_) =>
logging.info(this, s"Successfully posted runtime to topic $topic")
case Failure(_) =>
logging.error(this, s"Failed posted runtime to topic $topic")
}
}
}

override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
Expand Down
2 changes: 2 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ whisk {
}

invoker {
username: "invoker.user"
password: "invoker.pass"
protocol: http
}
runtime.delete.timeout = "30 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.util.{Random, Try}

case class ColdStartKey(kind: String, memory: ByteSize)

case class PreWarmConfigList(list: List[PrewarmingConfig])
object PrewarmQuery

case object EmitMetrics

case object AdjustPrewarmedContainer
Expand Down Expand Up @@ -68,6 +72,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
var latestPrewarmConfig = prewarmConfig
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
Expand Down Expand Up @@ -297,6 +302,34 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case prewarmConfigList: PreWarmConfigList =>
logging.info(this, "update prewarm configuration request is send to invoker")
val passedPrewarmConfig = prewarmConfigList.list
var newPrewarmConfig: List[PrewarmingConfig] = List.empty
latestPrewarmConfig foreach { config =>
newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig
.find(passedConfig =>
passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit)
.getOrElse(config)
}
latestPrewarmConfig = newPrewarmConfig
// Delete prewarmedPool firstly
prewarmedPool foreach { element =>
val actor = element._1
actor ! Remove
prewarmedPool = prewarmedPool - actor
}
latestPrewarmConfig foreach { config =>
logging.info(
this,
s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.initialCount).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
}
}
case PrewarmQuery =>
sender() ! getPrewarmContainer()
case EmitMetrics =>
emitMetrics()

Expand Down Expand Up @@ -327,7 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
if (scheduled) {
//on scheduled time, remove expired prewarms
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p =>
prewarmedPool = prewarmedPool - p
p ! Remove
}
Expand All @@ -340,7 +373,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
//fill in missing prewarms (replaces any deletes)
ContainerPool
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
.increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool)
.foreach { c =>
val config = c._1
val currentCount = c._2._1
Expand Down Expand Up @@ -380,7 +413,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

/** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
prewarmConfig
latestPrewarmConfig
.filter { config =>
kind == config.exec.kind && memoryLimit == config.memoryLimit
}
Expand Down Expand Up @@ -421,7 +454,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

//get the appropriate ttl from prewarm configs
val ttl =
prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
latestPrewarmConfig
.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind)
.flatMap(_.reactive.map(_.ttl))
prewarmContainer(action.exec, memory, ttl)
(ref, data)
}
Expand All @@ -434,6 +469,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
busyPool = busyPool - toDelete
}

/**
* get the prewarm container
* @return
*/
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
val containerDataList = prewarmedPool.values.toList

var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
containerDataList.foreach { prewarmData =>
val isInclude = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.size > 0

if (isInclude) {
var resultData = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.head
resultData.number += 1
} else {
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
}
}
resultList
}

/**
* Calculate if there is enough free memory within a given pool.
*
Expand Down
Loading

0 comments on commit a7be1d4

Please sign in to comment.