From daefc1e939625f6e309ce49fbea86bc2991071dd Mon Sep 17 00:00:00 2001 From: Kamil Lontkowski <80274179+Kamil-Lontkowski@users.noreply.github.com> Date: Tue, 31 Dec 2024 12:25:26 +0100 Subject: [PATCH] Adaptive retries (#257) Co-authored-by: adamw <adam@warski.org> --- .../scala/ox/resilience/AdaptiveRetry.scala | 148 ++++++++++++++++++ .../scala/ox/resilience/ResultPolicy.scala | 2 +- .../scala/ox/resilience/RetryConfig.scala | 18 ++- .../StartTimeRateLimiterAlgorithm.scala | 8 +- .../scala/ox/resilience/TokenBucket.scala | 20 +++ core/src/main/scala/ox/resilience/retry.scala | 2 +- .../scala/ox/scheduling/RepeatConfig.scala | 17 +- .../main/scala/ox/scheduling/scheduled.scala | 36 ++--- .../ox/resilience/BackoffRetryTest.scala | 3 +- .../ox/resilience/DelayedRetryTest.scala | 25 +++ .../ox/resilience/ImmediateRetryTest.scala | 60 +++++++ doc/utils/repeat.md | 2 +- doc/utils/retries.md | 81 +++++++++- doc/utils/scheduled.md | 8 +- 14 files changed, 381 insertions(+), 49 deletions(-) create mode 100644 core/src/main/scala/ox/resilience/AdaptiveRetry.scala create mode 100644 core/src/main/scala/ox/resilience/TokenBucket.scala diff --git a/core/src/main/scala/ox/resilience/AdaptiveRetry.scala b/core/src/main/scala/ox/resilience/AdaptiveRetry.scala new file mode 100644 index 00000000..88075d1a --- /dev/null +++ b/core/src/main/scala/ox/resilience/AdaptiveRetry.scala @@ -0,0 +1,148 @@ +package ox.resilience + +import ox.scheduling.{ScheduleStop, ScheduledConfig, SleepMode, scheduledWithErrorMode} +import ox.* + +import scala.util.Try + +/** Implements "adaptive" retries: every retry costs [[failureCost]] tokens from the bucket, and every success causes [[successReward]] + * tokens to be added to the bucket. If there are not enough tokens, retry is not attempted. + * + * This way retries don't overload a system that is down due to a systemic failure (such as a bug in the code, excessive load etc.): + * retries will be attempted only as long as there are enough tokens in the bucket, then the load on the downstream system will be reduced + * so that it can recover. For transient failures (component failure, infrastructure issues etc.), retries still work as expected, as the + * bucket has enough tokens to cover the cost of multiple retries. + * + * Instances of this class are thread-safe and are designed to be shared. Typically, a single instance should be used to proxy access to a + * single constrained resource. + * + * An instance with default parameters can be created using [[AdaptiveRetry.default]]. + * + * Inspired by: + * - [`AdaptiveRetryStrategy`](https://github.com/aws/aws-sdk-java-v2/blob/master/core/retries/src/main/java/software/amazon/awssdk/retries/AdaptiveRetryStrategy.java) + * from `aws-sdk-java-v2` + * - ["Try again: The tools and techniques behind resilient systems" from re:Invent 2024](https://www.youtube.com/watch?v=rvHd4Y76-fs) + * + * @param tokenBucket + * Instance of [[TokenBucket]]. As a token bucket is thread-safe, it can be shared between different instances of [[AdaptiveRetry]], e.g. + * with a different [[failureCost]]. + * @param failureCost + * Number of tokens to take from [[tokenBucket]] when retrying. + * @param successReward + * Number of tokens to add back to [[tokenBucket]] after a successful operation. + */ +case class AdaptiveRetry( + tokenBucket: TokenBucket, + failureCost: Int, + successReward: Int +): + /** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by + * the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen. + * + * @param config + * The retry config - See [[RetryConfig]]. + * @param shouldPayFailureCost + * Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid + * only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`. + * @param errorMode + * The error mode to use, which specifies when a result value is considered success, and when a failure. + * @param operation + * The operation to retry. + * @tparam E + * type of error. + * @tparam T + * type of result of an operation. + * @tparam F + * the context inside which [[E]] or [[T]] are returned. + * @return + * Either: + * - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode. + * - the error `E` in context `F` as returned by the last attempt if the config decides to stop. + * @see + * [[scheduledWithErrorMode]] + */ + def retryWithErrorMode[E, T, F[_]](errorMode: ErrorMode[E, F])( + config: RetryConfig[E, T], + shouldPayFailureCost: Either[E, T] => Boolean = (_: Either[E, T]) => true + )(operation: => F[T]): F[T] = + + val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) => + config.onRetry(attemptNum, attempt) + attempt match + case Left(value) => + // If we want to retry we try to acquire tokens from bucket + if config.resultPolicy.isWorthRetrying(value) then + if shouldPayFailureCost(Left(value)) then ScheduleStop(!tokenBucket.tryAcquire(failureCost)) + else ScheduleStop.Yes + else ScheduleStop.Yes + case Right(value) => + // If we are successful, we release tokens to bucket and end schedule + if config.resultPolicy.isSuccess(value) then + tokenBucket.release(successReward) + ScheduleStop.Yes + // If it is not success we check if we need to acquire tokens, then we check bucket, otherwise we continue + else if shouldPayFailureCost(Right(value)) then ScheduleStop(!tokenBucket.tryAcquire(failureCost)) + else ScheduleStop.No + end match + end afterAttempt + + val scheduledConfig = ScheduledConfig( + config.schedule, + afterAttempt, + sleepMode = SleepMode.Delay + ) + + scheduledWithErrorMode(errorMode)(scheduledConfig)(operation) + end retryWithErrorMode + + /** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions + * thrown by the operation aren't caught and don't cause a retry to happen. + * + * @param config + * The retry config - see [[RetryConfig]]. + * @param shouldPayFailureCost + * Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid + * only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`. + * @param operation + * The operation to retry. + * @tparam E + * type of error. + * @tparam T + * type of result of an operation. + * @return + * A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last + * attempt. + * @see + * [[scheduledEither]] + */ + def retryEither[E, T](config: RetryConfig[E, T], shouldPayFailureCost: Either[E, T] => Boolean = (_: Either[E, T]) => true)( + operation: => Either[E, T] + ): Either[E, T] = + retryWithErrorMode(EitherMode[E])(config, shouldPayFailureCost)(operation) + + /** Retries an operation returning a direct result until it succeeds or the config decides to stop. + * + * @param config + * The retry config - see [[RetryConfig]]. + * @param shouldPayFailureCost + * Function to decide if returned result Either[E, T] should be considered failure in terms of paying cost for retry. Penalty is paid + * only if it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`. + * @param operation + * The operation to retry. + * @return + * The result of the function if it eventually succeeds. + * @throws anything + * The exception thrown by the last attempt if the config decides to stop. + * @see + * [[scheduled]] + */ + def retry[T]( + config: RetryConfig[Throwable, T], + shouldPayFailureCost: Either[Throwable, T] => Boolean = (_: Either[Throwable, T]) => true + )(operation: => T): T = + retryWithErrorMode(EitherMode[Throwable])(config, shouldPayFailureCost)(Try(operation).toEither).fold(throw _, identity) + +end AdaptiveRetry + +object AdaptiveRetry: + def default: AdaptiveRetry = AdaptiveRetry(TokenBucket(500), 5, 1) diff --git a/core/src/main/scala/ox/resilience/ResultPolicy.scala b/core/src/main/scala/ox/resilience/ResultPolicy.scala index 6e44f11c..d230b735 100644 --- a/core/src/main/scala/ox/resilience/ResultPolicy.scala +++ b/core/src/main/scala/ox/resilience/ResultPolicy.scala @@ -29,7 +29,7 @@ object ResultPolicy: /** A policy that customizes which errors are retried, and considers every non-erroneous result successful * @param isWorthRetrying - * A predicate that indicates whether an erroneous result should be retried.. + * A predicate that indicates whether an erroneous result should be retried. */ def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying) diff --git a/core/src/main/scala/ox/resilience/RetryConfig.scala b/core/src/main/scala/ox/resilience/RetryConfig.scala index e07cd1b7..c0c0af2c 100644 --- a/core/src/main/scala/ox/resilience/RetryConfig.scala +++ b/core/src/main/scala/ox/resilience/RetryConfig.scala @@ -1,6 +1,6 @@ package ox.resilience -import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig} +import ox.scheduling.{Jitter, Schedule, ScheduleStop, ScheduledConfig, SleepMode} import scala.concurrent.duration.* @@ -29,13 +29,15 @@ case class RetryConfig[E, T]( resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T], onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => () ): - def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig( - schedule, - onRetry, - shouldContinueOnError = resultPolicy.isWorthRetrying, - shouldContinueOnResult = t => !resultPolicy.isSuccess(t), - sleepMode = SleepMode.Delay - ) + def toScheduledConfig: ScheduledConfig[E, T] = + val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) => + onRetry(attemptNum, attempt) + attempt match + case Left(value) => ScheduleStop(!resultPolicy.isWorthRetrying(value)) + case Right(value) => ScheduleStop(resultPolicy.isSuccess(value)) + + ScheduledConfig(schedule, afterAttempt, SleepMode.Delay) + end toScheduledConfig end RetryConfig object RetryConfig: diff --git a/core/src/main/scala/ox/resilience/StartTimeRateLimiterAlgorithm.scala b/core/src/main/scala/ox/resilience/StartTimeRateLimiterAlgorithm.scala index c979c740..ed9cec17 100644 --- a/core/src/main/scala/ox/resilience/StartTimeRateLimiterAlgorithm.scala +++ b/core/src/main/scala/ox/resilience/StartTimeRateLimiterAlgorithm.scala @@ -102,13 +102,13 @@ object StartTimeRateLimiterAlgorithm: case class LeakyBucket(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: private val refillInterval = per.toNanos private val lastRefillTime = new AtomicLong(System.nanoTime()) - private val semaphore = new Semaphore(1) + private val bucket = TokenBucket(rate, Some(1)) def acquire(permits: Int): Unit = - semaphore.acquire(permits) + bucket.acquire(permits) def tryAcquire(permits: Int): Boolean = - semaphore.tryAcquire(permits) + bucket.tryAcquire(permits) def getNextUpdate: Long = val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime() @@ -117,7 +117,7 @@ object StartTimeRateLimiterAlgorithm: def update(): Unit = val now = System.nanoTime() lastRefillTime.set(now) - if semaphore.availablePermits() < rate then semaphore.release() + bucket.release(1) def runOperation[T](operation: => T, permits: Int): T = operation diff --git a/core/src/main/scala/ox/resilience/TokenBucket.scala b/core/src/main/scala/ox/resilience/TokenBucket.scala new file mode 100644 index 00000000..f29bc40a --- /dev/null +++ b/core/src/main/scala/ox/resilience/TokenBucket.scala @@ -0,0 +1,20 @@ +package ox.resilience + +import java.util.concurrent.Semaphore + +/** Used by the leaky bucket rate limiter & [[AdaptiveRetry]], to limit the rate of operations. */ +case class TokenBucket(bucketSize: Int, initSize: Option[Int] = None): + private val semaphore = Semaphore(initSize.getOrElse(bucketSize)) + + def tryAcquire(permits: Int): Boolean = + semaphore.tryAcquire(permits) + + def acquire(permits: Int): Unit = + semaphore.acquire(permits) + + def release(permits: Int): Unit = + val availablePermits = semaphore.availablePermits() + val toRelease = if availablePermits + permits >= bucketSize then bucketSize - availablePermits else permits + semaphore.release(toRelease) + +end TokenBucket diff --git a/core/src/main/scala/ox/resilience/retry.scala b/core/src/main/scala/ox/resilience/retry.scala index 8e35d6f7..3cb582d6 100644 --- a/core/src/main/scala/ox/resilience/retry.scala +++ b/core/src/main/scala/ox/resilience/retry.scala @@ -7,7 +7,7 @@ import scala.util.Try /** Retries an operation returning a direct result until it succeeds or the config decides to stop. * - * [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details. + * [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]]. * * @param config * The retry config - see [[RetryConfig]]. diff --git a/core/src/main/scala/ox/scheduling/RepeatConfig.scala b/core/src/main/scala/ox/scheduling/RepeatConfig.scala index 7959abbf..b15cb458 100644 --- a/core/src/main/scala/ox/scheduling/RepeatConfig.scala +++ b/core/src/main/scala/ox/scheduling/RepeatConfig.scala @@ -9,8 +9,8 @@ import scala.concurrent.duration.DurationInt * the specified duration after the previous operations has finished. If the previous operation takes longer than the interval, the next * operation will start immediately after the previous one has finished. * - * It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Interval]] and - * [[ScheduledConfig.shouldContinueOnError]] always returning `false`. + * It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Interval]] and a + * [[ScheduledConfig.afterAttempt]] callback which checks if the result was successful. * * @param schedule * The repeat schedule which determines the maximum number of invocations and the interval between subsequent invocations. See @@ -28,11 +28,14 @@ case class RepeatConfig[E, T]( schedule: Schedule, shouldContinueOnResult: T => Boolean = (_: T) => true ): - def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig( - schedule, - shouldContinueOnResult = shouldContinueOnResult, - sleepMode = SleepMode.Interval - ) + def toScheduledConfig: ScheduledConfig[E, T] = + val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, attempt) => + attempt match + case Left(_) => ScheduleStop.Yes + case Right(value) => ScheduleStop(!shouldContinueOnResult(value)) + + ScheduledConfig(schedule, afterAttempt, SleepMode.Interval) + end toScheduledConfig end RepeatConfig object RepeatConfig: diff --git a/core/src/main/scala/ox/scheduling/scheduled.scala b/core/src/main/scala/ox/scheduling/scheduled.scala index 37c853ec..8736bb58 100644 --- a/core/src/main/scala/ox/scheduling/scheduled.scala +++ b/core/src/main/scala/ox/scheduling/scheduled.scala @@ -14,25 +14,27 @@ enum SleepMode: */ case Interval - /** Delay (since the end of the last operation), i.e. sleeps the duration provided by the schedule before the next operation starts. - */ + /** Delay (since the end of the last operation), i.e. sleeps the duration provided by the schedule before the next operation starts. */ case Delay end SleepMode +/** @see [[ScheduleConfig.afterAttempt]] */ +enum ScheduleStop(val stop: Boolean): + case Yes extends ScheduleStop(true) + case No extends ScheduleStop(false) + +object ScheduleStop: + def apply(stop: Boolean): ScheduleStop = if stop then Yes else No + /** A config that defines how to schedule an operation. * * @param schedule * The schedule which determines the maximum number of invocations and the duration between subsequent invocations. See [[Schedule]] for * more details. - * @param onOperationResult - * A function that is invoked after each invocation. The callback receives the number of the current invocations number (starting from 1) - * and the result of the operation. The result is either a successful value or an error. - * @param shouldContinueOnError - * A function that determines whether to continue the loop after an error. The function receives the error that was emitted by the last - * invocation. Defaults to [[_ => false]]. - * @param shouldContinueOnResult - * A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last - * invocation. Defaults to [[_ => true]]. + * @param afterAttempt + * A callback invoked after every attempt, with the current invocation number (starting from 1) and the result of the operation. Might + * decide to short-curcuit further attempts, and stop the schedule. Schedule configuration (e.g. max number of attempts) takes + * precedence. * @param sleepMode * The mode that specifies how to interpret the duration provided by the schedule. See [[SleepMode]] for more details. * @tparam E @@ -43,9 +45,7 @@ end SleepMode */ case class ScheduledConfig[E, T]( schedule: Schedule, - onOperationResult: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => (), - shouldContinueOnError: E => Boolean = (_: E) => false, - shouldContinueOnResult: T => Boolean = (_: T) => true, + afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, _: Either[E, T]) => ScheduleStop.No, sleepMode: SleepMode = SleepMode.Interval ) @@ -107,17 +107,17 @@ def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledCon operation match case v if em.isError(v) => val error = em.getError(v) - config.onOperationResult(invocation, Left(error)) + val shouldStop = config.afterAttempt(invocation, Left(error)) - if config.shouldContinueOnError(error) && remainingInvocations.forall(_ > 0) then + if remainingInvocations.forall(_ > 0) && !shouldStop.stop then val delay = sleepIfNeeded(startTimestamp) loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay)) else v case v => val result = em.getT(v) - config.onOperationResult(invocation, Right(result)) + val shouldStop = config.afterAttempt(invocation, Right(result)) - if config.shouldContinueOnResult(result) && remainingInvocations.forall(_ > 0) then + if remainingInvocations.forall(_ > 0) && !shouldStop.stop then val delay = sleepIfNeeded(startTimestamp) loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay)) else v diff --git a/core/src/test/scala/ox/resilience/BackoffRetryTest.scala b/core/src/test/scala/ox/resilience/BackoffRetryTest.scala index 15ecf345..9c379311 100644 --- a/core/src/test/scala/ox/resilience/BackoffRetryTest.scala +++ b/core/src/test/scala/ox/resilience/BackoffRetryTest.scala @@ -61,7 +61,8 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with if true then throw new RuntimeException("boom") // when - val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f)) + val (result, elapsedTime) = + measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f)) // then result should have message "boom" diff --git a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala index 1b41257a..e7681f67 100644 --- a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala +++ b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala @@ -68,4 +68,29 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with elapsedTime.toMillis should be >= maxRetries * sleep.toMillis counter shouldBe 4 } + + behavior of "adaptive retry with delayed config" + + it should "retry a failing function forever or until adaptive retry blocks it" in { + // given + var counter = 0 + val sleep = 2.millis + val retriesUntilSuccess = 1_000 + val successfulResult = 42 + val bucketSize = 500 + val errorMessage = "boom" + + def f = + counter += 1 + if counter <= retriesUntilSuccess then throw RuntimeException(errorMessage) else successfulResult + + // when + val adaptive = AdaptiveRetry(TokenBucket(bucketSize), 1, 1) + val result = the[RuntimeException] thrownBy adaptive.retry(RetryConfig.delayForever(sleep))(f) + + // then + result should have message errorMessage + counter shouldBe bucketSize + 1 + } + end DelayedRetryTest diff --git a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala index 7e775a06..91e9aa5e 100644 --- a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala +++ b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala @@ -161,4 +161,64 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi result.left.value shouldBe errorMessage counter shouldBe 4 } + + behavior of "Adaptive retry with immediate config" + + it should "retry a failing adaptive" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + if counter <= 2 then Left(errorMessage) + else Right("Success") + + val adaptive = AdaptiveRetry(TokenBucket(5), 1, 1) + // when + val result = adaptive.retryEither(RetryConfig.immediate(5))(f) + + // then + result.value shouldBe "Success" + counter shouldBe 3 + } + + it should "stop retrying after emptying bucket" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Left(errorMessage) + + val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1) + // when + val result = adaptive.retryEither(RetryConfig.immediate[String, String](5))(f) + + // then + result.left.value shouldBe errorMessage + // One for first try, two for retries with bucket size 2 + counter shouldBe 3 + } + + it should "not pay exceptionCost if result T is going to be retried and shouldPayPenaltyCost returns false" in { + // given + var counter = 0 + val message = "success" + + def f = + counter += 1 + Right(message) + + val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1) + val retryConfig = RetryConfig.immediate(5).copy(resultPolicy = ResultPolicy.successfulWhen[String, String](_ => false)) + // when + val result = adaptive.retryEither(retryConfig, _ => false)(f) + + // then + result.value shouldBe message + counter shouldBe 6 + } + end ImmediateRetryTest diff --git a/doc/utils/repeat.md b/doc/utils/repeat.md index 528e930d..0e6cd4d3 100644 --- a/doc/utils/repeat.md +++ b/doc/utils/repeat.md @@ -27,7 +27,7 @@ Similarly to the `retry` API, the `operation` can be defined: The `repeat` config requires a `Schedule`, which indicates how many times and with what interval should the `operation` be repeated. -In addition, it is possible to define a custom `shouldContinueOnSuccess` strategy for deciding if the operation +In addition, it is possible to define a custom `shouldContinueOnResult` strategy for deciding if the operation should continue to be repeated after a successful result returned by the previous operation (defaults to `_: T => true`). If an operation returns an error, the repeat loop will always be stopped. If an error handling within the operation diff --git a/doc/utils/retries.md b/doc/utils/retries.md index 6706f096..622d99b1 100644 --- a/doc/utils/retries.md +++ b/doc/utils/retries.md @@ -121,7 +121,86 @@ retry(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != retryEither(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation) // custom error mode -retryWithErrorMode(UnionMode[String])(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation) +retryWithErrorMode(UnionMode[String])( + RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation) ``` See the tests in `ox.resilience.*` for more. + +## Adaptive retries + +A retry strategy, backed by a token bucket. Every retry costs a certain amount of tokens from the bucket, and every success causes some tokens to be added back to the bucket. If there are not enought tokens, retry is not attempted. + +This way retries don't overload a system that is down due to a systemic failure (such as a bug in the code, excessive load etc.): retries will be attempted only as long as there are enought tokens in the bucket, then the load on the downstream system will be reduced so that it can recover. In contrast, using a "normal" retry strategy, where every operation is retries up to 3 times, a failure causes the load on the system to increas 4 times. + +For transient failures (component failure, infrastructure issues etc.), retries still work "normally", as the bucket has enough tokens to cover the cost of multiple retries. + +### Inspiration + +* [`AdaptiveRetryStrategy`](https://github.com/aws/aws-sdk-java-v2/blob/master/core/retries/src/main/java/software/amazon/awssdk/retries/AdaptiveRetryStrategy.java) from `aws-sdk-java-v2` +* *["Try again: The tools and techniques behind resilient systems" from re:Invent 2024](https://www.youtube.com/watch?v=rvHd4Y76-fs) + +### Configuration + +To use adaptive retries, create an instance of `AdaptiveRetry`. These instances are thread-safe and are designed to be shared. Typically, a single instance should be used to proxy access to a single constrained resource. + +`AdaptiveRetry` is parametrized with: + +* `tokenBucket: Tokenbucket`: instances of `TokenBucket` can be shared across multiple instances of `AdaptiveRetry` +* `failureCost: Int`: number of tokens that are needed for retry in case of failure +* `successReward: Int`: number of tokens that are added back to token bucket after success + +`RetryConfig` and `ResultPolicy` are defined the same as with "normal" retry mechanism, all the configuration from above also applies here. + +Instance with default configuration can be obtained with `AdaptiveRetry.default` (bucket size = 500, cost for failure = 5 and reward for success = 1). + +### API + +`AdaptiveRetry` exposes three variants of retrying, which correspond to the three variants discussed above: `retry`, `retryEither` and `retryWithErrorMode`. + +`retry` will attempt to retry an operation if it throws an exception; `retryEither` will additionally retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types). + +The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `T` should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation. + +### Examples + +If you want to use this mechanism you need to run operation through instance of `AdaptiveRetry`: + +```scala mdoc:compile-only +import ox.UnionMode +import ox.resilience.AdaptiveRetry +import ox.resilience.{ResultPolicy, RetryConfig} +import ox.scheduling.{Jitter, Schedule} +import scala.concurrent.duration.* + +def directOperation: Int = ??? +def eitherOperation: Either[String, Int] = ??? +def unionOperation: String | Int = ??? + +val adaptive = AdaptiveRetry.default + +// various configs with custom schedules and default ResultPolicy +adaptive.retry(RetryConfig.immediate(3))(directOperation) +adaptive.retry(RetryConfig.delay(3, 100.millis))(directOperation) +adaptive.retry(RetryConfig.backoff(3, 100.millis))(directOperation) // defaults: maxDelay = 1.minute, jitter = Jitter.None +adaptive.retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(directOperation) + +// result policies +// custom success +adaptive.retry[Int]( + RetryConfig(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation) +// fail fast on certain errors +adaptive.retry( + RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))(directOperation) +adaptive.retryEither( + RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation) + +// custom error mode +adaptive.retryWithErrorMode(UnionMode[String])( + RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation) + +// consider "throttling error" not as a failure that should incur the retry penalty +adaptive.retryWithErrorMode(UnionMode[String])( + RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), + shouldPayFailureCost = _.fold(_ != "throttling error", _ => true))(unionOperation) +``` diff --git a/doc/utils/scheduled.md b/doc/utils/scheduled.md index c7f26be6..7086fa6e 100644 --- a/doc/utils/scheduled.md +++ b/doc/utils/scheduled.md @@ -20,13 +20,7 @@ The `scheduled` config consists of: - `Interval` - default for `repeat` operations, where the sleep is calculated as the duration provided by schedule minus the duration of the last operation (can be negative, in which case the next operation occurs immediately). - `Delay` - default for `retry` operations, where the sleep is just the duration provided by schedule. -- `onOperationResult` - a callback function that is invoked after each operation. Used primarily for `onRetry` in `retry` API. - -In addition, it is possible to define strategies for handling the results and errors returned by the `operation`: -- `shouldContinueOnError` - defaults to `_: E => false`, which allows to decide if the scheduler loop should continue - after an error returned by the previous operation. -- `shouldContinueOnSuccess` - defaults to `_: T => true`, which allows to decide if the scheduler loop should continue - after a successful result returned by the previous operation. +- `afterAttempt` - a callback function that is invoked after each operation and determines if the scheduler loop should continue. Used for `onRetry`, `shouldContinueOnError`, `shouldContinueOnResult` and adaptive retries in `retry` API. Defaults to always continuing. ## Schedule