Skip to content

Commit

Permalink
Config runtime
Browse files Browse the repository at this point in the history
Sometimes, admin may want to reinitalize the runtime config depend on
the real requirements, e.g. increase some prewarm containers
  • Loading branch information
ningyougang committed Mar 23, 2020
1 parent 7113b73 commit 30a3e64
Show file tree
Hide file tree
Showing 19 changed files with 555 additions and 90 deletions.
2 changes: 2 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default('controller.user') }}"
password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"

"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

case class ControllerCredentials(username: String, password: String)
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,6 @@ object ConfigKeys {
val apacheClientConfig = "whisk.apache-client"

val parameterStorage = "whisk.parameter-storage"

val controllerCredentials = "whisk.credentials.controller"
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,30 @@ object EventMessage extends DefaultJsonProtocol {

def parse(msg: String) = Try(format.read(msg.parseJson))
}

case class RuntimeMessage(runtime: String) extends Message {
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
}

object RuntimeMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
}

case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
}

object PrewarmContainerData extends DefaultJsonProtocol {
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
}

case class PrewarmContainerDataList(items: List[PrewarmContainerData])

object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
def write(f: PrewarmContainerDataList) = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ protected[core] object ExecManifest {
mf
}

/**
* Reads runtimes manifest from runtime string
*
* @param runtime
* @return the manifest if initialized successfully, or an failure
*/
protected[core] def initialize(runtime: String): Try[Runtimes] = {
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
val mf = Try(runtime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
var manifest: Option[Runtimes] = None
mf.foreach(m => manifest = Some(m))
mf
}

/**
* Gets existing runtime manifests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand All @@ -31,8 +32,15 @@ import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.common.{
AkkaLogging,
ConfigMXBean,
ControllerCredentials,
Logging,
LoggingMarkers,
TransactionId
}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
Expand Down Expand Up @@ -97,7 +105,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
}

// initialize datastores
Expand Down Expand Up @@ -176,6 +184,58 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))

private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)

/**
* config runtime
*/
private val configRuntime = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "runtime") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerCredentials.username && password == controllerCredentials.password) {
entity(as[String]) { runtime =>
val execManifest = ExecManifest.initialize(runtime)
if (execManifest.isFailure) {
logging.info(this, s"received invalid runtimes manifest")
complete(StatusCodes.BadRequest)
} else {
parameter('limit.?) { limit =>
limit match {
case Some(targetValue) =>
val pattern = """\d+:\d"""
if (targetValue.matches(pattern)) {
val invokerArray = targetValue.split(":")
val beginIndex = invokerArray(0).toInt
val finishIndex = invokerArray(1).toInt
if (finishIndex < beginIndex) {
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
} else {
val targetInvokers = (beginIndex to finishIndex).toList
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
logging.info(this, "config runtime request is already sent to target invokers")
complete(StatusCodes.Accepted)
}
} else {
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
}
case None =>
loadBalancer.sendRuntimeToInvokers(runtime, None)
logging.info(this, "config runtime request is already sent to all managed invokers")
complete(StatusCodes.Accepted)
}
}
}
}
} else {
complete(StatusCodes.Unauthorized, "username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send runtime to invokers
*
* @param runtime
* @param targetInvokers
*/
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
Expand Down Expand Up @@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
}
}

/** send runtime to invokers*/
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
val runtimeMessage = RuntimeMessage(runtime)
schedulingState.managedInvokers.filter { manageInvoker =>
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, runtimeMessage).andThen {
case Success(_) =>
logging.info(this, s"Successfully posted runtime to topic $topic")
case Failure(_) =>
logging.error(this, s"Failed posted runtime to topic $topic")
}
}
}

override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.openwhisk.core.containerpool
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.util.Try

Expand All @@ -34,6 +36,9 @@ case object Free extends WorkerState

case class WorkerData(data: ContainerData, state: WorkerState)

case class PreWarmConfigList(list: List[PrewarmingConfig])
object PrewarmQuery

case object EmitMetrics

/**
Expand Down Expand Up @@ -70,6 +75,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
var latestPrewarmConfig = prewarmConfig
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
Expand Down Expand Up @@ -279,6 +285,23 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case prewarmConfigList: PreWarmConfigList =>
latestPrewarmConfig = prewarmConfigList.list
// Delete prewarmedPool firstly
prewarmedPool foreach { element =>
val actor = element._1
actor ! Remove
prewarmedPool = prewarmedPool - actor
}
prewarmConfigList.list foreach { config =>
logging.info(this, s"add pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}
case PrewarmQuery =>
sender() ! getPrewarmContainer()
case EmitMetrics =>
emitMetrics()
}
Expand All @@ -304,7 +327,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

/** Install prewarm containers up to the configured requirements for each kind/memory combination. */
def backfillPrewarms(init: Boolean) = {
prewarmConfig.foreach { config =>
latestPrewarmConfig.foreach { config =>
val kind = config.exec.kind
val memory = config.memoryLimit
val currentCount = prewarmedPool.count {
Expand Down Expand Up @@ -375,6 +398,33 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
busyPool = busyPool - toDelete
}

/**
* get the prewarm container
* @return
*/
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
val containerDataList = prewarmedPool.values.toList.map { data =>
data.asInstanceOf[PreWarmedData]
}

var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
containerDataList.foreach { prewarmData =>
val isInclude = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.size > 0

if (isInclude) {
var resultData = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.head
resultData.number += 1
} else {
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
}
}
resultList
}

/**
* Calculate if there is enough free memory within a given pool.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker

import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}

import org.apache.openwhisk.http.BasicRasService

import scala.concurrent.ExecutionContext

/**
* Implements web server to handle certain REST API calls.
*/
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
val actorSystem: ActorSystem,
val logger: Logging)
extends BasicRasService {

override def routes(implicit transid: TransactionId): Route = {
super.routes ~ {
(path("getRuntime") & get) {
invoker.getRuntime()
}
}
}
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new DefaultInvokerServer(invoker)
}
Loading

0 comments on commit 30a3e64

Please sign in to comment.