Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify sampling rate for actors #56

Merged
merged 21 commits into from
Nov 6, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.Option;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains advices for monitoring behaviour of an actor; typically imprisoned in an {@code ActorCell}.
Expand All @@ -30,6 +32,7 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl
private AkkaAgentConfiguration agentConfiguration;
private final CounterInterface counterInterface;
private final Option<String> noActorClazz = Option.empty();
private final ConcurrentHashMap<Option<String>, AtomicLong> concurrentCounters;

/**
* Constructs this aspect
Expand All @@ -38,6 +41,7 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl
AgentConfiguration<AkkaAgentConfiguration> configuration = getAgentConfiguration("akka", AkkaAgentConfigurationJapi.apply());
this.agentConfiguration = configuration.agent();
this.counterInterface = createCounterInterface(configuration.common());
this.concurrentCounters = new ConcurrentHashMap<Option<String>, AtomicLong>();
}

/**
Expand All @@ -50,16 +54,31 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl
}

// decide whether to include this ActorCell in our measurements
private boolean includeActorPath(final ActorPath actorPath, final Option<String> actorClassName) {
if (!this.agentConfiguration.incuded().accept(actorPath, actorClassName)) return false;
if (this.agentConfiguration.excluded().accept(actorPath, actorClassName)) return false;
private boolean includeActorPath(final PathAndClass pathAndClass) {
if (!this.agentConfiguration.incuded().accept(pathAndClass)) return false;
if (this.agentConfiguration.excluded().accept(pathAndClass)) return false;

if (this.agentConfiguration.includeSystemAgents()) return true;

String userOrSystem = actorPath.getElements().iterator().next();
String userOrSystem = pathAndClass.actorPath().getElements().iterator().next();
return "user".equals(userOrSystem);
}

// get the sample rate for an actor
private int getSampleRate(final PathAndClass pathAndClass) {
return this.agentConfiguration.sampling().getRate(pathAndClass);
}

// decide whether to sample an actor on a particular occasion
private final boolean sampleMessage(final PathAndClass pathAndClass) {
int sampleRate = getSampleRate(pathAndClass);
if (sampleRate == 1) return true;

this.concurrentCounters.putIfAbsent(pathAndClass.actorClassName(), new AtomicLong(0));
long timesSeenSoFar = this.concurrentCounters.get(pathAndClass.actorClassName()).incrementAndGet();
return (timesSeenSoFar % sampleRate == 1); // == 1 to log first value (incrementAndGet returns updated value)
}

// get the tags for the cell
private String[] getTags(final ActorPath actorPath, final Actor actor) {
List<String> tags = new ArrayList<String>();
Expand All @@ -83,22 +102,30 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl

/**
* Advises the {@code ActorCell.receiveMessage(message: Object): Unit}
* We proceed with the pointcut if the actor is to be included in the monitoring *and* this is
* the 'multiple-of-n'th time we've seen a message for an actor with a sample rate of n.
*
* Currently, we sample queue size, the fact that the message is delivered, the simple name of the class of the
* message, and the time taken to complete the actor's reactive action.
*
* @param actorCell the ActorCell where the actor that receives the message "lives"
* @param msg the incoming message
*/
Object around(ActorCell actorCell, Object msg) : Pointcuts.actorCellReceiveMessage(actorCell, msg) {
final ActorPath actorPath = actorCell.self().path();
if (!includeActorPath(actorPath, Option.apply(actorCell.actor().getClass().getCanonicalName()))) return proceed(actorCell, msg);
final PathAndClass pathAndClass = new PathAndClass(actorPath, Option.apply(actorCell.actor().getClass().getCanonicalName()));
if (!includeActorPath(pathAndClass) || !sampleMessage(pathAndClass)) return proceed(actorCell, msg);

int samplingRate = getSampleRate(pathAndClass);

// we tag by actor name
final String[] tags = getTags(actorPath, actorCell.actor());

// record the queue size
this.counterInterface.recordGaugeValue("akka.queue.size", actorCell.numberOfMessages(), tags);
// record the message, general and specific
this.counterInterface.incrementCounter("akka.actor.delivered", tags);
this.counterInterface.incrementCounter("akka.actor.delivered." + msg.getClass().getSimpleName(), tags);
this.counterInterface.incrementCounter("akka.actor.delivered", samplingRate, tags);
this.counterInterface.incrementCounter("akka.actor.delivered." + msg.getClass().getSimpleName(), samplingRate, tags);

// measure the time. we're using the ``nanoTime`` call to access the high-precision timer.
// since we're not really interested in wall time, but just some increasing measure of
Expand Down Expand Up @@ -150,7 +177,7 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl
* @param actor the {@code ActorRef} returned from the call
*/
after() returning (ActorRef actor): Pointcuts.anyActorOf() {
if (!includeActorPath(actor.path(), this.noActorClazz)) return;
if (!includeActorPath(new PathAndClass(actor.path(), this.noActorClazz))) return;

final String tag = actor.path().root().toString();
this.counterInterface.incrementCounter("akka.actor.count", tag);
Expand All @@ -162,7 +189,7 @@ public aspect ActorCellMonitoringAspect extends AbstractMonitoringAspect issingl
* @param actor the actor being stopped
*/
after(ActorRef actor) : Pointcuts.actorCellStop(actor) {
if (!includeActorPath(actor.path(), this.noActorClazz)) return;
if (!includeActorPath(new PathAndClass(actor.path(), this.noActorClazz))) return;

final String tag = actor.path().root().toString();
this.counterInterface.decrementCounter("akka.actor.count", tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.eigengo.monitor.agent.akka

import com.typesafe.config.Config
import com.typesafe.config.{ConfigObject, Config}
import org.eigengo.monitor.agent.akka.ActorFilter._
import org.eigengo.monitor.agent.akka.ActorFilter.NamedActorSystem
import org.eigengo.monitor.agent.akka.ActorFilter.SameType
Expand All @@ -27,8 +27,10 @@ import org.eigengo.monitor.agent.akka.ActorFilter.SameType
* @param includeSystemAgents include the system agents in the monitoring?
* @param incuded the filter that matches the included actors
* @param excluded the filter that matches the excluded actors
* @param sampling defines the sampling rate for any actors where we don't want to log every message received
*/
case class AkkaAgentConfiguration(includeRoutees: Boolean, includeSystemAgents: Boolean, incuded: ActorFilter, excluded: ActorFilter)
case class AkkaAgentConfiguration(includeRoutees: Boolean, includeSystemAgents: Boolean, incuded: ActorFilter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're missing @param for sampling.

excluded: ActorFilter, sampling: SamplingRates)

/**
* Companion for AkkaAgentConfiguration that provides a method to turn a ``Config`` into
Expand All @@ -50,7 +52,9 @@ object AkkaAgentConfiguration {
val includeRoutees = if (config.hasPath("includeRoutees")) config.getBoolean("includeRoutees") else false
val included = if (config.hasPath("included")) config.getStringList("included").map(parseFilter).toList else Nil
val excluded = if (config.hasPath("excluded")) config.getStringList("included").map(parseFilter).toList else Nil
AkkaAgentConfiguration(includeRoutees, false, AnyAcceptActorFilter(included, true), AnyAcceptActorFilter(excluded, false))
val sampling = if (config.hasPath("sampling")) config.getObjectList("sampling").flatMap(parseSampling).toList else Nil
AkkaAgentConfiguration(includeRoutees, false, AnyAcceptActorFilter(included, true),
AnyAcceptActorFilter(excluded, false), SamplingRates(sampling))
}

private def parseActorSystemFilter(actorSystemName: String): ActorSystemNameFilter =
Expand All @@ -67,6 +71,14 @@ object AkkaAgentConfiguration {
case ActorTypePattern(name, clazz) => ActorTypeFilter(parseActorSystemFilter(name), SameType(clazz))
}

private def parseSampling(samplingObject: ConfigObject): Iterable[SamplingRate] = {
import scala.collection.JavaConversions._
(samplingObject.get("rate").unwrapped(), samplingObject.get("for").unwrapped()) match {
case (rate: Number, filters: java.util.List[String @unchecked]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I didn't know about that @unchcked annotation.

filters.map(filter => SamplingRate(parseFilter(filter), rate.intValue()))
}
}

}

/**
Expand All @@ -83,5 +95,24 @@ object AkkaAgentConfigurationJapi {

}

//TODO: complete me
//case class SamplingRate(included: ActorFilter, sampleEvery: Int)
/**
* Represents a sampling rate provided by a conf object
*
* @param included the filter over the actors
* @param sampleEvery how often to sample (e.g. every 5 messages)
*/
case class SamplingRate(included: ActorFilter, sampleEvery: Int)

/**
* A wrapper for holding multiple sampling rates
*/
case class SamplingRates(samplingRates: List[SamplingRate]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have this as just a container, you can make it extends AnyVal, which stops boxing; that is, it Scala only considers SamplingRates as compile-time artefact. It saves memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally! A legitimate use for it! Good call
Edit: Alas, the compiler doesn't like the usage of the 'getRate' def inside .aj files, so we can't do this :(

/**
* Gets the sampling rate for a particular actor
*
* @param pathAndClass an object representing the actor's path and (optionally) its class name
* @return the rate as an integer
*/
def getRate(pathAndClass: PathAndClass): Int =
samplingRates.find(_.included.accept(pathAndClass)).map(_.sampleEvery).getOrElse(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.eigengo.monitor.agent.akka.ActorFilter._

import scala.annotation.tailrec

// a case class for holding information about an actor's path and (optionally) its type
case class PathAndClass(actorPath: ActorPath, actorClassName: Option[String])
/**
* ActorFilters always start by filtering the actor system, and then filtering either by looking
* at the actor path, or by looking at the actor type
Expand All @@ -42,11 +44,10 @@ sealed trait ActorFilter {
/**
* Decides whether to accept this ``actorSystemName`` and ``actorPath``
*
* @param actorPath the actor path being examined
* @param actorClassName the type of the actor being examined
* @param pathAndClass the path and type of the actor being examined
* @return ``true`` if the actor system and path are acceptable
*/
def accept(actorPath: ActorPath, actorClassName: Option[String]): Boolean
def accept(pathAndClass: PathAndClass): Boolean

}

Expand All @@ -57,8 +58,8 @@ sealed trait ActorFilter {
* @param zero the zero
*/
case class AnyAcceptActorFilter(filters: List[ActorFilter], zero: Boolean) extends ActorFilter {
override def accept(actorPath: ActorPath, actorClassName: Option[String]): Boolean = {
for (filter <- filters) if (!filter.accept(actorPath, actorClassName)) return false
override def accept(pathAndClass: PathAndClass): Boolean = {
for (filter <- filters) if (!filter.accept(pathAndClass)) return false
zero
}
}
Expand All @@ -71,21 +72,21 @@ case class AnyAcceptActorFilter(filters: List[ActorFilter], zero: Boolean) exten
*/
case class ActorPathFilter(actorSystem: ActorSystemNameFilter, actorPathElements: List[ActorPathElement]) extends ActorFilter {

override def accept(actorPath: ActorPath, actorClassName: Option[String]): Boolean = actorSystem match {
case AnyActorSystem => localAccept(actorPath, actorClassName)
case NamedActorSystem(actorSystemName) => actorPath.root.address.system == actorSystemName && localAccept(actorPath, actorClassName)
override def accept(pathAndClass: PathAndClass): Boolean = actorSystem match {
case AnyActorSystem => localAccept(pathAndClass)
case NamedActorSystem(actorSystemName) => pathAndClass.actorPath.root.address.system == actorSystemName && localAccept(pathAndClass)
case _ => false
}

private def localAccept(actorPath: ActorPath, actorClassName: Option[String]): Boolean = {
private def localAccept(pathAndClass: PathAndClass): Boolean = {

@tailrec
def acceptAll(xs: List[(ActorPathElement, String)]): Boolean = xs match {
case Nil => true
case (ape, e)::t => if (ape.accept(e)) acceptAll(t) else false
}

val elements = actorPath.elements.toList
val elements = pathAndClass.actorPath.elements.toList
actorPathElements.size == elements.size && acceptAll(actorPathElements.zip(elements))
}
}
Expand All @@ -98,14 +99,14 @@ case class ActorPathFilter(actorSystem: ActorSystemNameFilter, actorPathElements
*/
case class ActorTypeFilter(actorSystem: ActorSystemNameFilter, actorType: ActorTypeOperator) extends ActorFilter {

override def accept(actorPath: ActorPath, actorClassName: Option[String]): Boolean = actorSystem match {
case AnyActorSystem => localAccept(actorPath, actorClassName)
case NamedActorSystem(actorSystemName) => actorPath.root.address.system == actorSystemName && localAccept(actorPath, actorClassName)
override def accept(pathAndClass: PathAndClass): Boolean = actorSystem match {
case AnyActorSystem => localAccept(pathAndClass)
case NamedActorSystem(actorSystemName) => pathAndClass.actorPath.root.address.system == actorSystemName && localAccept(pathAndClass)
case _ => false
}

private def localAccept(actorPath: ActorPath, actorClassName: Option[String]): Boolean =
actorClassName.map({ clazz =>
private def localAccept(pathAndClass: PathAndClass): Boolean =
pathAndClass.actorClassName.map({ clazz =>
actorType match {
case SameType(`clazz`) => true
case _ => false
Expand Down
17 changes: 17 additions & 0 deletions agent-akka/src/test/resources/META-INF/monitor/sample.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

{
sampling: [
{
rate: 5
for: [ "akka:*.org.eigengo.monitor.agent.akka.SimpleActor" ]
},
{
rate: 15
for: [ "akka:*.org.eigengo.monitor.agent.akka.WithUnhandledActor" ]
},
{
rate: 4
for: [ "akka://default/user/*" ]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ class ActorPathFilterSpec extends Specification {
"Path filter" should {
val actorSystemName = "default"
val singlePath = ActorPath.fromString(s"akka://$actorSystemName/foo/bar/baz")
val singlePathNullClass = PathAndClass(singlePath, null)

"Match concrete path" in {
ActorPathFilter(AnyActorSystem, List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual true
ActorPathFilter(NamedActorSystem(actorSystemName), List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual true
ActorPathFilter(NamedActorSystem("asdadasdadsad"), List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual false
ActorPathFilter(AnyActorSystem, List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual true
ActorPathFilter(NamedActorSystem(actorSystemName), List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual true
ActorPathFilter(NamedActorSystem("asdadasdadsad"), List("foo", "bar", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual false

ActorPathFilter(AnyActorSystem, List("faa", "bar", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual false
ActorPathFilter(AnyActorSystem, List("faa", "bar", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual false
}

"Match wildcard path" in {
ActorPathFilter(AnyActorSystem, SingleWildcardPathElement :: List("bar", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual true
ActorPathFilter(AnyActorSystem, SingleWildcardPathElement :: List("baa", "baz").map(NamedPathElement)).accept(singlePath, null) mustEqual false
ActorPathFilter(AnyActorSystem, SingleWildcardPathElement :: List("bar", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual true
ActorPathFilter(AnyActorSystem, SingleWildcardPathElement :: List("baa", "baz").map(NamedPathElement)).accept(singlePathNullClass) mustEqual false
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2013 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eigengo.monitor.agent.akka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to add license comment.


import org.eigengo.monitor.{TestCounter, TestCounterInterface}
import org.eigengo.monitor.agent.akka.Aspects._
import akka.testkit.TestActorRef

class ActorSamplingSpec extends ActorCellMonitoringAspectSpec(Some("sample.conf")){

"Actor sampling" should {
val a = TestActorRef[SimpleActor]("a")
val b = TestActorRef[WithUnhandledActor]("b")
val c = TestActorRef[NullTestingActor1]("c")
val d = TestActorRef[NullTestingActor2]("d")

"Sample concrete path" in {
TestCounterInterface.clear()
(0 until 1000) foreach {_ => a ! 1}

Thread.sleep(500) // wait for the messages

// we expect to see (1000/5)*5 messages to actor a
val counter = TestCounterInterface.foldlByAspect(deliveredInteger)(TestCounter.plus)

counter(0).value mustEqual 1000
counter(0).tags must contain(a.path.toString)
counter.size === 200


TestCounterInterface.clear()
(0 until 1000) foreach {_ => b ! 1}
Thread.sleep(500) // wait for the messages

// we expect to see (1000/15 ~=67)*15 = 1005 messages to actor b (we round up, since logging the first message)
val counter2 = TestCounterInterface.foldlByAspect(deliveredInteger)(TestCounter.plus)

counter2(0).value mustEqual 1005
counter2(0).tags must contain(b.path.toString)
counter2.size === 67
}

"Sample wildcard path" in {

TestCounterInterface.clear()
(0 until 497) foreach {_ => c ! 1} // if we weren't incrementing the counters separately for each actor, then we'd
(0 until 501) foreach {_ => d ! 1} // expect 998 messages, and thus 250*4 = 1000 messages logged. But we are -- so
// we expect 125*4 = 500 for actor c, and 126*4 = 504 for actor d
Thread.sleep(500) // wait for the messages

// we expect to see (500/4)*4*2 messages to actor c and d
val counter3 = TestCounterInterface.foldlByAspect(deliveredInteger)(TestCounter.plus)

counter3(0).value mustEqual 1004
counter3(0).tags must contain(d.path.toString)
counter3(125).tags must contain(d.path.toString)
counter3(126).tags must contain(c.path.toString)
counter3.size === 251

}
}

}
Loading