Skip to content

Commit

Permalink
support retries on aws clients
Browse files Browse the repository at this point in the history
exporting cloud watch logs has a quota, 1 task per account. requiring a retry if the limit is exceeded.
  • Loading branch information
cwensel committed Dec 6, 2023
1 parent 5e2d96c commit 4240b51
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ dependencies {

implementationAndTestFixture("javax.annotation:javax.annotation-api:1.3.2")

implementationAndTestFixture("io.github.resilience4j:resilience4j-retry:2.0.2")
implementationAndTestFixture("io.github.resilience4j:resilience4j-retry:2.1.0")

testImplementationAndTestFixture("org.mockito:mockito-core:5.7.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ protected int executeProcess(Map<String, String> environment, List<String> args)
return process(environment, args);
}

if (retry()) {
LOG.info("enabled retrying command: {} {} times", args, retries);
}
LOG.info("enabled retrying command: {} {} times", args, retries);

RetryConfig config = RetryConfig.<Integer>custom()
.maxAttempts(retries)
Expand Down
2 changes: 2 additions & 0 deletions clusterless-substrate-aws-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-joda")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")

implementation("io.github.resilience4j:resilience4j-retry")

api("com.jayway.jsonpath:json-path")

implementation("software.amazon.awssdk:s3")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package clusterless.cls.substrate.aws.sdk;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsClient;

import java.time.Duration;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class ClientRetry<C extends AwsClient> {
private static final Logger LOG = LoggerFactory.getLogger(ClientRetry.class);
private final RetryConfig config;
private final String client;

public ClientRetry(String client, int maxAttempts, Predicate<ClientBase<C>.Response> predicate) {
this.client = client;
this.config = RetryConfig.<ClientBase<C>.Response>custom()
.maxAttempts(maxAttempts)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(30), 2.0, Duration.ofMinutes(5)))
.consumeResultBeforeRetryAttempt((attempt, response) -> LOG.warn("got: {}, for retry attempt: {} of {}", response.errorMessage(), attempt, maxAttempts))
.retryOnResult(predicate)
.failAfterMaxAttempts(true)
.build();
}

public ClientBase<C>.Response invoke(Supplier<ClientBase<C>.Response> checkedSupplier) {
return Retry.of(client, config)
.executeSupplier(checkedSupplier);
}
}
2 changes: 0 additions & 2 deletions clusterless-substrate-aws-lambda-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,4 @@ dependencies {
testFixturesImplementation("uk.org.webcompere:system-stubs-core")
testFixturesImplementation("uk.org.webcompere:system-stubs-jupiter")
testFixturesImplementation("org.mockito:mockito-inline")

// implementation("io.github.resilience4j:resilience4j-retry")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import clusterless.aws.lambda.EventHandler;
import clusterless.aws.lambda.transform.json.event.AWSEvent;
import clusterless.cls.substrate.aws.sdk.ClientRetry;
import clusterless.cls.substrate.aws.sdk.CloudWatchLogs;
import clusterless.cls.util.Env;
import clusterless.commons.temporal.IntervalBuilder;
import com.amazonaws.services.lambda.runtime.Context;
import com.google.common.base.Stopwatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.LimitExceededException;

import java.net.URI;
import java.time.Duration;
Expand All @@ -31,7 +34,9 @@ public class CloudWatchExportActivityHandler extends EventHandler<AWSEvent, Clou
.build()
);

CloudWatchLogs cloudWatchLogs = new CloudWatchLogs();
protected final CloudWatchLogs cloudWatchLogs = new CloudWatchLogs();

protected final ClientRetry<CloudWatchLogsClient> retryClient = new ClientRetry<>("cloudwatchlogs", 5, r -> r.exception() instanceof LimitExceededException);

protected final IntervalBuilder intervalBuilder = new IntervalBuilder(activityProps.interval);

Expand Down Expand Up @@ -90,8 +95,11 @@ public void handleEvent(AWSEvent event, Context context, CloudWatchExportActivit
return;
}

// there is a service quota here: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
// Export task: One active (running or pending) export task at a time, per account. This quota can't be changed.
// Caused by: software.amazon.awssdk.services.cloudwatchlogs.model.LimitExceededException: Resource limit exceeded. (Service: CloudWatchLogs, Status Code: 400, Request ID: 79069256-8403-4435-a85c-17013c7d7c4c)
getStopwatch.start();
CloudWatchLogs.Response response = cloudWatchLogs.createExportLogGroupTask(taskName, logGroupName, logStreamPrefix, destination, startTimeInclusive, endTimeInclusive);
CloudWatchLogs.Response response = retryClient.invoke(() -> cloudWatchLogs.createExportLogGroupTask(taskName, logGroupName, logStreamPrefix, destination, startTimeInclusive, endTimeInclusive));
getStopwatch.stop();

response.isSuccessOrThrowRuntime(
Expand Down

0 comments on commit 4240b51

Please sign in to comment.