Skip to content
This repository has been archived by the owner on Mar 11, 2019. It is now read-only.

WIP: Fix CPU sampling #92

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ class LibpfmCoreProcessSensor(eventBus: MessageBus, muid: UUID, target: Target,
(core, event, Await.result(actor.?(msg.tick)(timeout), timeout.duration).asInstanceOf[HWCounter])
}

publishPCReport(muid, target, allValues.groupBy(tuple3 => (tuple3._1, tuple3._2)).map {
case ((core, event), values) => Map[Int, Map[String, Seq[HWCounter]]](core -> Map(event -> values.map(_._3).toSeq))
}.foldLeft(Map[Int, Map[String, Seq[HWCounter]]]())((acc, elt) => acc ++ elt), msg.tick)(eventBus)
publishPCReport(muid, target, allValues.groupBy(_._1).mapValues(_.map { case (_, event, hpc) => Map(event -> hpc) }.reduce(_ ++ _)), msg.tick)(eventBus)

context.become(sense(newIdentifiers) orElse sensorDefault)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ class LibpfmCoreSensor(eventBus: MessageBus, muid: UUID, target: Target, libpfmH
(core, event, Await.result(actor.?(msg.tick)(timeout), timeout.duration).asInstanceOf[HWCounter])
}

publishPCReport(muid, target, allValues.groupBy(tuple3 => (tuple3._1, tuple3._2)).map {
case ((core, event), values) => Map[Int, Map[String, Seq[HWCounter]]](core -> Map(event -> values.map(_._3).toSeq))
}.foldLeft(Map[Int, Map[String, Seq[HWCounter]]]())((acc, elt) => acc ++ elt), msg.tick)(eventBus)
publishPCReport(muid, target, allValues.groupBy(_._1).mapValues(_.map { case (_, event, hpc) => Map(event -> hpc) }.reduce(_ ++ _)), msg.tick)(eventBus)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ package org.powerapi.module.libpfm
import java.util.UUID

import scala.concurrent.duration.FiniteDuration

import akka.actor.Actor

import org.powerapi.core.MessageBus
import org.powerapi.core.power._
import org.powerapi.core.target.Target
import org.powerapi.module.Formula
import org.powerapi.module.PowerChannel.publishRawPowerReport
import org.powerapi.module.libpfm.PerformanceCounterChannel.{PCReport, subscribePCReport, unsubscribePCReport}
import org.powerapi.module.libpfm.PerformanceCounterChannel.{HWCounter, PCReport, subscribePCReport, unsubscribePCReport}

/**
* This formula is designed to fit a multivariate power model.
Expand All @@ -57,8 +55,8 @@ class LibpfmFormula(eventBus: MessageBus, muid: UUID, target: Target, formula: M
if (now - old <= 0) 0
else {
val value = msg.values.values.flatten.collect {
case (ev, counters) if ev == event => counters.map(_.value)
}.foldLeft(Seq[Long]())((acc, value) => acc ++ value).sum
case (hpcEvent, hpcValue) if hpcEvent == event => hpcValue.value
}.sum

coeff * math.round(value * (samplingInterval.toNanos / (now - old).toDouble))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object PerformanceCounterChannel extends Channel {
/**
* Publish a PerformanceCounterReport in the event bus.
*/
def publishPCReport(muid: UUID, target: Target, values: Map[Int, Map[String, Seq[HWCounter]]], tick: Tick): MessageBus => Unit = {
def publishPCReport(muid: UUID, target: Target, values: Map[Int, Map[String, HWCounter]], tick: Tick): MessageBus => Unit = {
publish(PCReport(pcReportToTopic(muid, target), muid, target, values, tick))
}

Expand Down Expand Up @@ -109,7 +109,7 @@ object PerformanceCounterChannel extends Channel {
case class PCReport(topic: String,
muid: UUID,
target: Target,
values: Map[Int, Map[String, Seq[HWCounter]]],
values: Map[Int, Map[String, HWCounter]],
tick: Tick) extends Message

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ class LibpfmCoreCyclesFormula(eventBus: MessageBus, muid: UUID, target: Target,
val now = System.nanoTime()

val powers = for (value <- msg.values) yield {
val cycles = value._2.getOrElse(cyclesThreadName, Seq(HWCounter(0)))
val refs = value._2.getOrElse(cyclesRefName, Seq(HWCounter(0)))
val cyclesVal = cycles.map(_.value).sum
val scaledCycles = if (now - old <= 0) 0l else math.round(cyclesVal * (samplingInterval.toNanos / (now - old).toDouble))

val refsVal = refs.map(_.value).sum
val scaledRefs = if (now - old <= 0) 0l else math.round(refsVal * (samplingInterval.toNanos / (now - old).toDouble))
val cycles = value._2.getOrElse(cyclesThreadName, HWCounter(0))
val refs = value._2.getOrElse(cyclesRefName, HWCounter(0))
val scaledCycles = if (now - old <= 0) 0l else math.round(cycles.value * (samplingInterval.toNanos / (now - old).toDouble))
val scaledRefs = if (now - old <= 0) 0l else math.round(refs.value * (samplingInterval.toNanos / (now - old).toDouble))

var coefficient: Double = math.round(scaledCycles / scaledRefs.toDouble)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ object Application extends App {
}

if (samplingOption._1) {
println("Starting sampling phase...")
Sampling(samplingOption._2, configuration, libpfmHelper).run()
}

if (processingOption._1) {
println("Starting processing phase...")
Processing(samplingOption._2, processingOption._2, configuration).run()
}

if (computingOption._1) {
println("Starting computation phase...")
PolynomialCyclesRegression(processingOption._2, computingOption._2, configuration).run()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,18 @@ class CountersDisplay(basepath: String, events: Set[String]) extends Actor with
}

def receive: Actor.Receive = {
case msg: PCReport => report(msg)
case msg: String => append(msg)
case msg: PCReport =>
report(msg)
case msg: String =>
append(msg)
}

def report(msg: PCReport): Unit = {

for (event <- events) {
val counter = msg.values.values.flatten.collect {
case (ev, counters) if ev == event => counters.map(_.value)
}.foldLeft(Seq[Long]())((acc, value) => acc ++ value).sum
case (hpcEvent, hpcValue) if hpcEvent == event => hpcValue.value
}.sum

outputs(event).append(s"$counter\n")
outputs(event).flush()
Expand Down