Skip to content

Commit

Permalink
Add simple window to error tracking before CB is Open
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Mienko committed Jun 12, 2024
1 parent 4e1e68e commit 89cc12e
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 16 deletions.
108 changes: 96 additions & 12 deletions core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ object CircuitBreaker {
)(implicit F: Applicative[F]): Builder[F] =
new Builder[F](
maxFailures = maxFailures,
failureWindow = Duration.Zero,
resetTimeout = resetTimeout,
backoff = Backoff.exponential,
maxResetTimeout = 1.minute,
Expand All @@ -423,6 +424,7 @@ object CircuitBreaker {

final class Builder[F[_]] private[circuit] (
private val maxFailures: Int,
private val failureWindow: FiniteDuration,
private val resetTimeout: FiniteDuration,
private val backoff: FiniteDuration => FiniteDuration,
private val maxResetTimeout: Duration,
Expand All @@ -437,6 +439,7 @@ object CircuitBreaker {

private def copy(
maxFailures: Int = self.maxFailures,
failureWindow: FiniteDuration = self.failureWindow,
resetTimeout: FiniteDuration = self.resetTimeout,
backoff: FiniteDuration => FiniteDuration = self.backoff,
maxResetTimeout: Duration = self.maxResetTimeout,
Expand All @@ -449,6 +452,7 @@ object CircuitBreaker {
): Builder[F] =
new Builder[F](
maxFailures = maxFailures,
failureWindow = failureWindow,
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
Expand All @@ -462,7 +466,9 @@ object CircuitBreaker {

def withMaxFailures(maxFailures: Int): Builder[F] =
copy(maxFailures = maxFailures)
def witResetTimeout(resetTimeout: FiniteDuration): Builder[F] =
def withFailureWindow(failureWindow: FiniteDuration): Builder[F] =
copy(failureWindow = failureWindow)
def withResetTimeout(resetTimeout: FiniteDuration): Builder[F] =
copy(resetTimeout = resetTimeout)
def withBackOff(backoff: FiniteDuration => FiniteDuration): Builder[F] =
copy(backoff = backoff)
Expand Down Expand Up @@ -491,10 +497,11 @@ object CircuitBreaker {
copy(exceptionFilter = exceptionFilter)

def build(implicit F: Temporal[F]): F[CircuitBreaker[F]] =
Concurrent[F].ref[State](ClosedZero).map(ref =>
Concurrent[F].ref[State](Closed(failureWindow)).map(ref =>
new SyncCircuitBreaker[F](
ref,
maxFailures,
failureWindow,
resetTimeout,
backoff,
maxResetTimeout,
Expand All @@ -508,10 +515,11 @@ object CircuitBreaker {
)

def in[G[_]: Sync](implicit F: Async[F]): G[CircuitBreaker[F]] =
Ref.in[G, F, State](ClosedZero).map { ref =>
Ref.in[G, F, State](Closed(failureWindow)).map { ref =>
new SyncCircuitBreaker[F](
ref,
maxFailures,
failureWindow,
resetTimeout,
backoff,
maxResetTimeout,
Expand All @@ -528,6 +536,7 @@ object CircuitBreaker {
new SyncCircuitBreaker[F](
ref,
maxFailures,
failureWindow,
resetTimeout,
backoff,
maxResetTimeout,
Expand Down Expand Up @@ -572,7 +581,73 @@ object CircuitBreaker {
*
* @param failures is the current failures count
*/
final case class Closed(failures: Int) extends State
// final case class Closed(failures: Int) extends State

final case class Failure(timestamp: Timestamp, count: Int) {
def increment: Failure = copy(count = count + 1)
}

object Failure {
def at(timestamp: Timestamp): Failure = Failure(timestamp, count = 1)
}

final case class Closed private (events: Option[List[Failure]], windowSizeInMs: Long, failures: Int) extends State {
def increment(timestamp: Timestamp): Closed =
events match {
case None =>
Closed.noWindow(failures = failures + 1)

case Some(events) =>
val cutoff = timestamp - windowSizeInMs
val (validFailures, expiredFailures) = events.partition(_.timestamp >= cutoff)

validFailures.headOption match {
case Some(latestFailure) =>
// Slow, but simple. Tree would be a better structure for partition while preserving fast append. Note
// that tree should keep a count of all children node counts for total failure to be efficient.
val updatedFailures = failures - expiredFailures.map(_.count).sum + 1

if (latestFailure.timestamp == timestamp)
withWindow(
events = latestFailure.increment :: validFailures.tail,
updatedFailures
)
else
withWindow(
events = Failure.at(timestamp) :: validFailures,
updatedFailures
)

case None =>
withWindow(
events = List(Failure.at(timestamp)),
failures = 1
)
}
}

private def withWindow(events: List[Failure], failures: Int): Closed =
copy(events = events.some, failures = failures)

}

object Closed {
def apply(failureWindow: FiniteDuration): Closed =
if (failureWindow == Duration.Zero)
noWindow(failures = 0)
else
new Closed(
events = List.empty[Failure].some,
failureWindow.toMillis,
failures = 0
)

def noWindow(failures: Int): Closed = Closed(
events = none[List[Failure]],
windowSizeInMs = 0,
failures
)
}

/** [[State]] of the [[CircuitBreaker]] in which the circuit
* breaker rejects all tasks with a [[RejectedExecution]].
Expand Down Expand Up @@ -623,7 +698,7 @@ object CircuitBreaker {
*/
case object HalfOpen extends State with Reason

private val ClosedZero = Closed(0)
// private val ClosedZero = Closed(0)


/** Exception thrown whenever an execution attempt was rejected.
Expand All @@ -634,6 +709,7 @@ object CircuitBreaker {
private final class SyncCircuitBreaker[F[_]] (
ref: Ref[F, CircuitBreaker.State],
maxFailures: Int,
failureWindow: FiniteDuration,
resetTimeout: FiniteDuration,
backoff: FiniteDuration => FiniteDuration,
maxResetTimeout: Duration,
Expand All @@ -651,6 +727,7 @@ object CircuitBreaker {
require(resetTimeout > Duration.Zero, "resetTimeout > 0")
require(maxResetTimeout > Duration.Zero, "maxResetTimeout > 0")

private lazy val ClosedZero = Closed(failureWindow)

def state: F[CircuitBreaker.State] = ref.get

Expand All @@ -659,6 +736,7 @@ object CircuitBreaker {
new SyncCircuitBreaker(
ref = ref,
maxFailures = maxFailures,
failureWindow = failureWindow,
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
Expand All @@ -675,6 +753,7 @@ object CircuitBreaker {
new SyncCircuitBreaker(
ref = ref,
maxFailures = maxFailures,
failureWindow = failureWindow,
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
Expand All @@ -691,6 +770,7 @@ object CircuitBreaker {
new SyncCircuitBreaker(
ref = ref,
maxFailures = maxFailures,
failureWindow = failureWindow,
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
Expand All @@ -708,6 +788,7 @@ object CircuitBreaker {
new SyncCircuitBreaker(
ref = ref,
maxFailures = maxFailures,
failureWindow = failureWindow,
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
Expand All @@ -724,19 +805,22 @@ object CircuitBreaker {
poll(f).guaranteeCase {
case Outcome.Succeeded(_) =>
ref.modify{
case Closed(_) => (ClosedZero, F.unit)
case _: Closed => (ClosedZero, F.unit)
case HalfOpen => (ClosedZero, onClosed.attempt.void)
case Open(_,_) => (ClosedZero, onClosed.attempt.void)
}.flatten
case Outcome.Errored(e) =>
Temporal[F].realTime.map(_.toMillis).flatMap { now =>
ref.modify {
case Closed(failures) =>
case closed: Closed =>
if (exceptionFilter(e)) {
val count = failures + 1
if (count >= maxFailures) (Open(now, resetTimeout), onOpen.attempt.void)
else (Closed(count), Applicative[F].unit)
} else (ClosedZero, Applicative[F].unit)
val updated = closed.increment(now)
if (updated.failures >= maxFailures)
(Open(now, resetTimeout), onOpen.attempt.void)
else
(updated, Applicative[F].unit)
} else
(ClosedZero, Applicative[F].unit)
case open: Open => (open, Applicative[F].unit)
case HalfOpen => (HalfOpen, Applicative[F].unit)
}.flatten
Expand Down Expand Up @@ -810,6 +894,6 @@ object CircuitBreaker {
override def doOnRejected(callback: F[Unit]): CircuitBreaker[F] = self
override def doOnHalfOpen(callback: F[Unit]): CircuitBreaker[F] = self
override def doOnClosed(callback: F[Unit]): CircuitBreaker[F] = self
override def state: F[CircuitBreaker.State] = F.pure(CircuitBreaker.Closed(0))
override def state: F[CircuitBreaker.State] = F.pure(CircuitBreaker.Closed.noWindow(0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class CircuitBreakerTests extends CatsEffectSuite {
for {
_ <- taskInError.attempt
_ <- taskInError.attempt
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed(2)))
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed.noWindow(2)))
// A successful value should reset the counter
_ <- taskSuccess
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed(0)))
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed.noWindow(0)))

_ <- taskInError.attempt.replicateA(5)
_ <- circuitBreaker.state.map {
Expand Down Expand Up @@ -199,7 +199,7 @@ class CircuitBreakerTests extends CatsEffectSuite {
_ <- fiber.join

// Should re-open on success
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed(0)))
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed.noWindow(0)))
} yield assertEquals( (openedCount, closedCount, halfOpenCount, rejectedCount), (1, 1, 1, 3))

fa
Expand All @@ -220,7 +220,7 @@ class CircuitBreakerTests extends CatsEffectSuite {
val fa =
for {
_ <- taskInError.attempt
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed(1)))
_ <- circuitBreaker.state.map(assertEquals(_, CircuitBreaker.Closed.noWindow(1)))
_ <- taskInError.attempt
_ <- circuitBreaker.state.map{
case CircuitBreaker.Open(_, t) => assertEquals(t, 100.millis)
Expand Down Expand Up @@ -352,6 +352,24 @@ class CircuitBreakerTests extends CatsEffectSuite {
} yield ()
}

test("should only count errors in a Window if windowing is enabled") {
for {
circuitBreaker <- CircuitBreaker.default[IO](maxFailures = 2, resetTimeout = 10.seconds).withFailureWindow(200.millis).build
action = circuitBreaker.protect(IO.raiseError(new RuntimeException("Boom!"))).attempt
_ <- action >> IO.sleep(200.millis) >> action >> IO.sleep(200.millis) >> action
_ <- circuitBreaker.state.map {
case _: CircuitBreaker.Closed => assert(true)
case _ => assert(false)
}
_ <- IO.sleep(200.millis) >> action >> IO.sleep(50.millis) >> action
_ <- circuitBreaker.state.map {
case _: CircuitBreaker.Open => assert(true)
case _ => assert(false)
}
} yield ()

}

test("Validate withUncancelableHalfOpen") {
closesAfterHalfOpenCancelation(cancelable = false).assertEquals(true)
}
Expand Down

0 comments on commit 89cc12e

Please sign in to comment.