Skip to content

Commit

Permalink
Adaptive retries (#257)
Browse files Browse the repository at this point in the history
Co-authored-by: adamw <[email protected]>
  • Loading branch information
Kamil-Lontkowski and adamw authored Dec 31, 2024
1 parent 7c00633 commit daefc1e
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 49 deletions.
148 changes: 148 additions & 0 deletions core/src/main/scala/ox/resilience/AdaptiveRetry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package ox.resilience

import ox.scheduling.{ScheduleStop, ScheduledConfig, SleepMode, scheduledWithErrorMode}
import ox.*

import scala.util.Try

/** Implements "adaptive" retries: every retry costs [[failureCost]] tokens from the bucket, and every success causes [[successReward]]
* tokens to be added to the bucket. If there are not enough tokens, retry is not attempted.
*
* This way retries don't overload a system that is down due to a systemic failure (such as a bug in the code, excessive load etc.):
* retries will be attempted only as long as there are enough tokens in the bucket, then the load on the downstream system will be reduced
* so that it can recover. For transient failures (component failure, infrastructure issues etc.), retries still work as expected, as the
* bucket has enough tokens to cover the cost of multiple retries.
*
* Instances of this class are thread-safe and are designed to be shared. Typically, a single instance should be used to proxy access to a
* single constrained resource.
*
* An instance with default parameters can be created using [[AdaptiveRetry.default]].
*
* Inspired by:
* - [`AdaptiveRetryStrategy`](https://github.com/aws/aws-sdk-java-v2/blob/master/core/retries/src/main/java/software/amazon/awssdk/retries/AdaptiveRetryStrategy.java)
* from `aws-sdk-java-v2`
* - ["Try again: The tools and techniques behind resilient systems" from re:Invent 2024](https://www.youtube.com/watch?v=rvHd4Y76-fs)
*
* @param tokenBucket
* Instance of [[TokenBucket]]. As a token bucket is thread-safe, it can be shared between different instances of [[AdaptiveRetry]], e.g.
* with a different [[failureCost]].
* @param failureCost
* Number of tokens to take from [[tokenBucket]] when retrying.
* @param successReward
* Number of tokens to add back to [[tokenBucket]] after a successful operation.
*/
case class AdaptiveRetry(
tokenBucket: TokenBucket,
failureCost: Int,
successReward: Int
):
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by
* the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
*
* @param config
* The retry config - See [[RetryConfig]].
* @param shouldPayFailureCost
* Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid
* only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`.
* @param errorMode
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
* The operation to retry.
* @tparam E
* type of error.
* @tparam T
* type of result of an operation.
* @tparam F
* the context inside which [[E]] or [[T]] are returned.
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the config decides to stop.
* @see
* [[scheduledWithErrorMode]]
*/
def retryWithErrorMode[E, T, F[_]](errorMode: ErrorMode[E, F])(
config: RetryConfig[E, T],
shouldPayFailureCost: Either[E, T] => Boolean = (_: Either[E, T]) => true
)(operation: => F[T]): F[T] =

val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) =>
config.onRetry(attemptNum, attempt)
attempt match
case Left(value) =>
// If we want to retry we try to acquire tokens from bucket
if config.resultPolicy.isWorthRetrying(value) then
if shouldPayFailureCost(Left(value)) then ScheduleStop(!tokenBucket.tryAcquire(failureCost))
else ScheduleStop.Yes
else ScheduleStop.Yes
case Right(value) =>
// If we are successful, we release tokens to bucket and end schedule
if config.resultPolicy.isSuccess(value) then
tokenBucket.release(successReward)
ScheduleStop.Yes
// If it is not success we check if we need to acquire tokens, then we check bucket, otherwise we continue
else if shouldPayFailureCost(Right(value)) then ScheduleStop(!tokenBucket.tryAcquire(failureCost))
else ScheduleStop.No
end match
end afterAttempt

val scheduledConfig = ScheduledConfig(
config.schedule,
afterAttempt,
sleepMode = SleepMode.Delay
)

scheduledWithErrorMode(errorMode)(scheduledConfig)(operation)
end retryWithErrorMode

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions
* thrown by the operation aren't caught and don't cause a retry to happen.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param shouldPayFailureCost
* Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid
* only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`.
* @param operation
* The operation to retry.
* @tparam E
* type of error.
* @tparam T
* type of result of an operation.
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last
* attempt.
* @see
* [[scheduledEither]]
*/
def retryEither[E, T](config: RetryConfig[E, T], shouldPayFailureCost: Either[E, T] => Boolean = (_: Either[E, T]) => true)(
operation: => Either[E, T]
): Either[E, T] =
retryWithErrorMode(EitherMode[E])(config, shouldPayFailureCost)(operation)

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param shouldPayFailureCost
* Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid
* only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`.
* @param operation
* The operation to retry.
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the config decides to stop.
* @see
* [[scheduled]]
*/
def retry[T](
config: RetryConfig[Throwable, T],
shouldPayFailureCost: Either[Throwable, T] => Boolean = (_: Either[Throwable, T]) => true
)(operation: => T): T =
retryWithErrorMode(EitherMode[Throwable])(config, shouldPayFailureCost)(Try(operation).toEither).fold(throw _, identity)

end AdaptiveRetry

object AdaptiveRetry:
def default: AdaptiveRetry = AdaptiveRetry(TokenBucket(500), 5, 1)
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/resilience/ResultPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object ResultPolicy:

/** A policy that customizes which errors are retried, and considers every non-erroneous result successful
* @param isWorthRetrying
* A predicate that indicates whether an erroneous result should be retried..
* A predicate that indicates whether an erroneous result should be retried.
*/
def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying)

Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/ox/resilience/RetryConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ox.resilience

import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig}
import ox.scheduling.{Jitter, Schedule, ScheduleStop, ScheduledConfig, SleepMode}

import scala.concurrent.duration.*

Expand Down Expand Up @@ -29,13 +29,15 @@ case class RetryConfig[E, T](
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
):
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !resultPolicy.isSuccess(t),
sleepMode = SleepMode.Delay
)
def toScheduledConfig: ScheduledConfig[E, T] =
val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) =>
onRetry(attemptNum, attempt)
attempt match
case Left(value) => ScheduleStop(!resultPolicy.isWorthRetrying(value))
case Right(value) => ScheduleStop(resultPolicy.isSuccess(value))

ScheduledConfig(schedule, afterAttempt, SleepMode.Delay)
end toScheduledConfig
end RetryConfig

object RetryConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ object StartTimeRateLimiterAlgorithm:
case class LeakyBucket(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val refillInterval = per.toNanos
private val lastRefillTime = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(1)
private val bucket = TokenBucket(rate, Some(1))

def acquire(permits: Int): Unit =
semaphore.acquire(permits)
bucket.acquire(permits)

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)
bucket.tryAcquire(permits)

def getNextUpdate: Long =
val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime()
Expand All @@ -117,7 +117,7 @@ object StartTimeRateLimiterAlgorithm:
def update(): Unit =
val now = System.nanoTime()
lastRefillTime.set(now)
if semaphore.availablePermits() < rate then semaphore.release()
bucket.release(1)

def runOperation[T](operation: => T, permits: Int): T = operation

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/ox/resilience/TokenBucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ox.resilience

import java.util.concurrent.Semaphore

/** Used by the leaky bucket rate limiter & [[AdaptiveRetry]], to limit the rate of operations. */
case class TokenBucket(bucketSize: Int, initSize: Option[Int] = None):
private val semaphore = Semaphore(initSize.getOrElse(bucketSize))

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)

def acquire(permits: Int): Unit =
semaphore.acquire(permits)

def release(permits: Int): Unit =
val availablePermits = semaphore.availablePermits()
val toRelease = if availablePermits + permits >= bucketSize then bucketSize - availablePermits else permits
semaphore.release(toRelease)

end TokenBucket
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/resilience/retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.util.Try

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details.
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]].
*
* @param config
* The retry config - see [[RetryConfig]].
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/ox/scheduling/RepeatConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import scala.concurrent.duration.DurationInt
* the specified duration after the previous operations has finished. If the previous operation takes longer than the interval, the next
* operation will start immediately after the previous one has finished.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Interval]] and
* [[ScheduledConfig.shouldContinueOnError]] always returning `false`.
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Interval]] and a
* [[ScheduledConfig.afterAttempt]] callback which checks if the result was successful.
*
* @param schedule
* The repeat schedule which determines the maximum number of invocations and the interval between subsequent invocations. See
Expand All @@ -28,11 +28,14 @@ case class RepeatConfig[E, T](
schedule: Schedule,
shouldContinueOnResult: T => Boolean = (_: T) => true
):
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
shouldContinueOnResult = shouldContinueOnResult,
sleepMode = SleepMode.Interval
)
def toScheduledConfig: ScheduledConfig[E, T] =
val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, attempt) =>
attempt match
case Left(_) => ScheduleStop.Yes
case Right(value) => ScheduleStop(!shouldContinueOnResult(value))

ScheduledConfig(schedule, afterAttempt, SleepMode.Interval)
end toScheduledConfig
end RepeatConfig

object RepeatConfig:
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/ox/scheduling/scheduled.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,27 @@ enum SleepMode:
*/
case Interval

/** Delay (since the end of the last operation), i.e. sleeps the duration provided by the schedule before the next operation starts.
*/
/** Delay (since the end of the last operation), i.e. sleeps the duration provided by the schedule before the next operation starts. */
case Delay
end SleepMode

/** @see [[ScheduleConfig.afterAttempt]] */
enum ScheduleStop(val stop: Boolean):
case Yes extends ScheduleStop(true)
case No extends ScheduleStop(false)

object ScheduleStop:
def apply(stop: Boolean): ScheduleStop = if stop then Yes else No

/** A config that defines how to schedule an operation.
*
* @param schedule
* The schedule which determines the maximum number of invocations and the duration between subsequent invocations. See [[Schedule]] for
* more details.
* @param onOperationResult
* A function that is invoked after each invocation. The callback receives the number of the current invocations number (starting from 1)
* and the result of the operation. The result is either a successful value or an error.
* @param shouldContinueOnError
* A function that determines whether to continue the loop after an error. The function receives the error that was emitted by the last
* invocation. Defaults to [[_ => false]].
* @param shouldContinueOnResult
* A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last
* invocation. Defaults to [[_ => true]].
* @param afterAttempt
* A callback invoked after every attempt, with the current invocation number (starting from 1) and the result of the operation. Might
* decide to short-curcuit further attempts, and stop the schedule. Schedule configuration (e.g. max number of attempts) takes
* precedence.
* @param sleepMode
* The mode that specifies how to interpret the duration provided by the schedule. See [[SleepMode]] for more details.
* @tparam E
Expand All @@ -43,9 +45,7 @@ end SleepMode
*/
case class ScheduledConfig[E, T](
schedule: Schedule,
onOperationResult: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => (),
shouldContinueOnError: E => Boolean = (_: E) => false,
shouldContinueOnResult: T => Boolean = (_: T) => true,
afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, _: Either[E, T]) => ScheduleStop.No,
sleepMode: SleepMode = SleepMode.Interval
)

Expand Down Expand Up @@ -107,17 +107,17 @@ def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledCon
operation match
case v if em.isError(v) =>
val error = em.getError(v)
config.onOperationResult(invocation, Left(error))
val shouldStop = config.afterAttempt(invocation, Left(error))

if config.shouldContinueOnError(error) && remainingInvocations.forall(_ > 0) then
if remainingInvocations.forall(_ > 0) && !shouldStop.stop then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
config.onOperationResult(invocation, Right(result))
val shouldStop = config.afterAttempt(invocation, Right(result))

if config.shouldContinueOnResult(result) && remainingInvocations.forall(_ > 0) then
if remainingInvocations.forall(_ > 0) && !shouldStop.stop then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/ox/resilience/BackoffRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f))
val (result, elapsedTime) =
measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f))

// then
result should have message "boom"
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/ox/resilience/DelayedRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,29 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with
elapsedTime.toMillis should be >= maxRetries * sleep.toMillis
counter shouldBe 4
}

behavior of "adaptive retry with delayed config"

it should "retry a failing function forever or until adaptive retry blocks it" in {
// given
var counter = 0
val sleep = 2.millis
val retriesUntilSuccess = 1_000
val successfulResult = 42
val bucketSize = 500
val errorMessage = "boom"

def f =
counter += 1
if counter <= retriesUntilSuccess then throw RuntimeException(errorMessage) else successfulResult

// when
val adaptive = AdaptiveRetry(TokenBucket(bucketSize), 1, 1)
val result = the[RuntimeException] thrownBy adaptive.retry(RetryConfig.delayForever(sleep))(f)

// then
result should have message errorMessage
counter shouldBe bucketSize + 1
}

end DelayedRetryTest
Loading

0 comments on commit daefc1e

Please sign in to comment.