failsafe) {
+ this.initialCall = call;
+ this.failsafe = failsafe;
+ }
+
+ /**
+ * Returns a FailsafeCall for the {@code call}, {@code outerPolicy} and {@code policies}. See {@link
+ * Failsafe#with(Policy, Policy[])} for docs on how policy composition works.
+ *
+ * @param policy type
+ * @throws NullPointerException if {@code call} or {@code outerPolicy} are null
+ */
+ @SafeVarargs
+ public static
> FailsafeCall of(okhttp3.Call call, P outerPolicy, P... policies) {
+ return of(call, Failsafe.with(outerPolicy, policies));
+ }
+
+ /**
+ * Returns a FailsafeCall for the {@code call} and {@code failsafeExecutor}.
+ *
+ * @throws NullPointerException if {@code call} or {@code failsafeExecutor} are null
+ */
+ public static FailsafeCall of(okhttp3.Call call, FailsafeExecutor failsafeExecutor) {
+ return new FailsafeCall(Assert.notNull(call, "call"), Assert.notNull(failsafeExecutor, "failsafeExecutor"));
+ }
+
+ /**
+ * Cancels the call.
+ */
+ public void cancel() {
+ if (!cancelled.compareAndSet(false, true))
+ return;
+ if (failsafeCall != null)
+ failsafeCall.cancel();
+ if (failsafeFuture != null)
+ failsafeFuture.cancel(false);
+ }
+
+ /**
+ * Returns a clone of the FailsafeCall.
+ */
+ public FailsafeCall clone() {
+ return FailsafeCall.of(initialCall.clone(), failsafe);
+ }
+
+ /**
+ * Executes the call until a successful response is returned or the configured policies are exceeded. To avoid leaking
+ * resources callers should {@link Response#close() close} the Response which in turn will close the underlying
+ * ResponseBody.
+ *
+ * @throws IllegalStateException if the call has already been executed
+ * @throws IOException if the request could not be executed due to cancellation, a connectivity problem, or timeout
+ * @throws FailsafeException if the execution fails with a checked Exception. {@link FailsafeException#getCause()} can
+ * be used to learn the underlying checked exception.
+ */
+ public Response execute() throws IOException {
+ Assert.isTrue(executed.compareAndSet(false, true), "already executed");
+
+ failsafeCall = failsafe.getCall(ctx -> {
+ return prepareCall(ctx).execute();
+ });
+
+ try {
+ return failsafeCall.execute();
+ } catch (FailsafeException e) {
+ if (e.getCause() instanceof IOException)
+ throw (IOException) e.getCause();
+ throw e;
+ }
+ }
+
+ /**
+ * Executes the call asynchronously until a successful result is returned or the configured policies are exceeded. To
+ * avoid leaking resources callers should {@link Response#close() close} the Response which in turn will close the
+ * underlying ResponseBody.
+ */
+ public CompletableFuture executeAsync() {
+ if (!executed.compareAndSet(false, true)) {
+ CompletableFuture result = new CompletableFuture<>();
+ result.completeExceptionally(new IllegalStateException("already executed"));
+ return result;
+ }
+
+ failsafeFuture = failsafe.getAsyncExecution(exec -> {
+ prepareCall(exec).enqueue(new Callback() {
+ @Override
+ public void onResponse(okhttp3.Call call, Response response) {
+ exec.recordResult(response);
+ }
+
+ @Override
+ public void onFailure(okhttp3.Call call, IOException e) {
+ exec.recordException(e);
+ }
+ });
+ });
+
+ return failsafeFuture;
+ }
+
+ /**
+ * Returns whether the call has been cancelled.
+ */
+ public boolean isCanceled() {
+ return cancelled.get();
+ }
+
+ /**
+ * Returns whether the call has been executed.
+ */
+ public boolean isExecuted() {
+ return executed.get();
+ }
+
+ private okhttp3.Call prepareCall(ExecutionContext ctx) {
+ okhttp3.Call call;
+ if (ctx.isFirstAttempt()) {
+ call = initialCall;
+ } else {
+ Response response = ctx.getLastResult();
+ if (response != null)
+ response.close();
+ call = initialCall.clone();
+ }
+
+ // Propagate cancellation to the call
+ ctx.onCancel(call::cancel);
+ return call;
+ }
+}
diff --git a/modules/okhttp/src/test/java/dev/failsafe/okhttp/FailsafeCallTest.java b/modules/okhttp/src/test/java/dev/failsafe/okhttp/FailsafeCallTest.java
new file mode 100644
index 00000000..c9c15c37
--- /dev/null
+++ b/modules/okhttp/src/test/java/dev/failsafe/okhttp/FailsafeCallTest.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package dev.failsafe.okhttp;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import dev.failsafe.*;
+import dev.failsafe.okhttp.testing.OkHttpTesting;
+import okhttp3.Call;
+import okhttp3.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.CancellationException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Test
+public class FailsafeCallTest extends OkHttpTesting {
+ public static final String URL = "http://localhost:8080";
+
+ WireMockServer server;
+ OkHttpClient client = new OkHttpClient.Builder().build();
+
+ @BeforeMethod
+ protected void beforeMethod() {
+ server = new WireMockServer();
+ server.start();
+ }
+
+ @AfterMethod
+ protected void afterMethod() {
+ server.stop();
+ }
+
+ public void testSuccess() {
+ // Given
+ mockResponse(200, "foo");
+ FailsafeExecutor failsafe = Failsafe.with(RetryPolicy.ofDefaults());
+ Call call = callFor("/test");
+
+ // When / Then
+ testRequest(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 1);
+ assertEquals(e.getExecutionCount(), 1);
+ }, 200, "foo");
+ assertCalled("/test", 2);
+ }
+
+ public void testRetryPolicyOn400() {
+ // Given
+ mockResponse(400, "foo");
+ RetryPolicy retryPolicy = RetryPolicy.builder().handleResultIf(r -> r.code() == 400).build();
+ FailsafeExecutor failsafe = Failsafe.with(retryPolicy);
+ Call call = callFor("/test");
+
+ // When / Then
+ testRequest(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 3);
+ assertEquals(e.getExecutionCount(), 3);
+ }, 400, "foo");
+ assertCalled("/test", 6);
+ }
+
+ public void testRetryPolicyOnResult() {
+ // Given
+ mockResponse(200, "bad");
+ RetryPolicy retryPolicy = RetryPolicy.builder()
+ .handleResultIf(r -> "bad".equals(r.peekBody(Long.MAX_VALUE).string()))
+ .build();
+ FailsafeExecutor failsafe = Failsafe.with(retryPolicy);
+ Call call = callFor("/test");
+
+ // When / Then
+ testRequest(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 3);
+ assertEquals(e.getExecutionCount(), 3);
+ }, 200, "bad");
+ assertCalled("/test", 6);
+ }
+
+ public void testRetryPolicyFallback() {
+ // Given
+ mockResponse(400, "foo");
+ Fallback fallback = Fallback.builder(r -> {
+ Response response = r.getLastResult();
+ ResponseBody body = ResponseBody.create("fallback", response.body().contentType());
+ return response.newBuilder().code(200).body(body).build();
+ }).handleResultIf(r -> r.code() == 400).build();
+ RetryPolicy retryPolicy = RetryPolicy.builder().handleResultIf(r -> r.code() == 400).build();
+ FailsafeExecutor failsafe = Failsafe.with(fallback, retryPolicy);
+ Call call = callFor("/test");
+
+ // When / Then
+ testRequest(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 3);
+ assertEquals(e.getExecutionCount(), 3);
+ }, 200, "fallback");
+ assertCalled("/test", 6);
+ }
+
+ /**
+ * Asserts that an open circuit breaker prevents executions from occurring, even with outer retries.
+ */
+ public void testCircuitBreaker() {
+ // Given
+ mockResponse(200, "foo");
+ CircuitBreaker breaker = CircuitBreaker.ofDefaults();
+ FailsafeExecutor failsafe = Failsafe.with(RetryPolicy.ofDefaults(), breaker);
+ Call call = callFor("/test");
+ breaker.open();
+
+ // When / Then
+ testFailure(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 3);
+ assertEquals(e.getExecutionCount(), 0);
+ }, CircuitBreakerOpenException.class);
+ assertCalled("/test", 0);
+ }
+
+ public void testTimeout() {
+ // Given
+ mockDelayedResponse(200, "foo", 1000);
+ FailsafeExecutor failsafe = Failsafe.with(Timeout.of(Duration.ofMillis(100)));
+ Call call = callFor("/test");
+
+ // When / Then
+ testFailure(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 1);
+ assertEquals(e.getExecutionCount(), 1);
+ }, TimeoutExceededException.class);
+ assertCalled("/test", 2);
+ }
+
+ public void testIOException() {
+ server.stop();
+ FailsafeExecutor failsafe = Failsafe.none();
+ Call call = callFor("/test");
+
+ testFailure(failsafe, call, (f, e) -> {
+ assertEquals(e.getAttemptCount(), 1);
+ assertEquals(e.getExecutionCount(), 1);
+ }, java.net.ConnectException.class);
+ }
+
+ public void testCancel() {
+ // Given
+ mockDelayedResponse(200, "foo", 1000);
+ FailsafeExecutor failsafe = Failsafe.none();
+ Call call = callFor("/test");
+
+ // When / Then Sync
+ FailsafeCall failsafeCall = FailsafeCall.of(call, failsafe);
+ runInThread(() -> {
+ sleep(150);
+ failsafeCall.cancel();
+ });
+ assertThrows(failsafeCall::execute, IOException.class);
+ assertTrue(call.isCanceled());
+ assertTrue(failsafeCall.isCanceled());
+
+ // When / Then Async
+ Call call2 = call.clone();
+ FailsafeCall failsafeCall2 = FailsafeCall.of(call2, failsafe);
+ runInThread(() -> {
+ sleep(150);
+ failsafeCall2.cancel();
+ });
+ assertThrows(() -> failsafeCall2.executeAsync().get(), CancellationException.class);
+ assertTrue(call2.isCanceled());
+ assertTrue(failsafeCall2.isCanceled());
+ assertCalled("/test", 2);
+ }
+
+ private Call callFor(String path) {
+ return client.newCall(new Request.Builder().url(URL + path).build());
+ }
+
+ private void mockResponse(int responseCode, String body) {
+ stubFor(get(urlPathEqualTo("/test")).willReturn(
+ aResponse().withStatus(responseCode).withHeader("Content-Type", "text/plain").withBody(body)));
+ }
+
+ private void mockDelayedResponse(int responseCode, String body, int delayMillis) {
+ stubFor(get(urlEqualTo("/test")).willReturn(
+ aResponse().withStatus(responseCode).withFixedDelay(delayMillis).withBody(body)));
+ }
+
+ private void assertCalled(String url, int times) {
+ verify(times, getRequestedFor(urlPathEqualTo(url)));
+ }
+}
diff --git a/modules/okhttp/src/test/java/dev/failsafe/okhttp/testing/OkHttpTesting.java b/modules/okhttp/src/test/java/dev/failsafe/okhttp/testing/OkHttpTesting.java
new file mode 100644
index 00000000..c1975759
--- /dev/null
+++ b/modules/okhttp/src/test/java/dev/failsafe/okhttp/testing/OkHttpTesting.java
@@ -0,0 +1,98 @@
+package dev.failsafe.okhttp.testing;
+
+import dev.failsafe.FailsafeExecutor;
+import dev.failsafe.event.EventListener;
+import dev.failsafe.event.ExecutionCompletedEvent;
+
+import dev.failsafe.okhttp.FailsafeCall;
+import dev.failsafe.testing.Testing;
+import net.jodah.concurrentunit.Waiter;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+public class OkHttpTesting extends Testing {
+ public void testRequest(FailsafeExecutor failsafe, Call when, Then then, int expectedStatus,
+ T expectedResult) {
+ test(failsafe, when, then, expectedStatus, expectedResult, null);
+ }
+
+ @SafeVarargs
+ public final void testFailure(FailsafeExecutor failsafe, Call when, Then then,
+ Class extends Throwable>... expectedExceptions) {
+ test(failsafe, when, then, 0, null, expectedExceptions);
+ }
+
+ private void test(FailsafeExecutor failsafe, Call when, Then then, int expectedStatus,
+ T expectedResult, Class extends Throwable>[] expectedExceptions) {
+ AtomicReference> futureRef = new AtomicReference<>();
+ AtomicReference> completedEventRef = new AtomicReference<>();
+ Waiter completionListenerWaiter = new Waiter();
+ EventListener> setCompletedEventFn = e -> {
+ completedEventRef.set(e);
+ completionListenerWaiter.resume();
+ };
+ List> expected = new LinkedList<>();
+ Class extends Throwable>[] expectedExInner = expectedExceptions == null ? new Class[] {} : expectedExceptions;
+ Collections.addAll(expected, expectedExInner);
+ failsafe.onComplete(setCompletedEventFn);
+
+ Runnable postTestFn = () -> {
+ ignoreExceptions(() -> completionListenerWaiter.await(5000));
+ ExecutionCompletedEvent completedEvent = completedEventRef.get();
+ if (expectedExceptions == null) {
+ assertEquals(completedEvent.getResult().code(), expectedStatus);
+ assertNull(completedEvent.getException());
+ } else {
+ assertNull(completedEvent.getResult());
+ assertMatches(completedEvent.getException(), expectedExceptions);
+ }
+ if (then != null)
+ then.accept(futureRef.get(), completedEvent);
+ };
+
+ Consumer assertResult = response -> {
+ String result = unwrapExceptions(() -> response.body().string());
+ assertEquals(result, expectedResult);
+ assertEquals(response.code(), expectedStatus);
+ };
+
+ // Run sync test and assert result
+ System.out.println("\nRunning sync test");
+ FailsafeCall failsafeCall = FailsafeCall.of(when, failsafe);
+ if (expectedExceptions == null) {
+ assertResult.accept(unwrapExceptions(failsafeCall::execute));
+ } else {
+ assertThrows(failsafeCall::execute, expectedExceptions);
+ }
+ postTestFn.run();
+
+ if (expectedExInner.length > 0)
+ expected.add(0, ExecutionException.class);
+
+ // Run async test and assert result
+ System.out.println("\nRunning async test");
+ failsafeCall = failsafeCall.clone();
+ CompletableFuture future = failsafeCall.executeAsync();
+ futureRef.set(future);
+ if (expectedExInner.length == 0) {
+ assertResult.accept(unwrapExceptions(future::get));
+ } else {
+ assertThrowsSup(future::get, expected);
+ }
+ postTestFn.run();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 46d38a43..6e9c7bf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,16 +2,11 @@
4.0.0
-
- org.sonatype.oss
- oss-parent
- 7
-
-
+ pom
dev.failsafe
- failsafe
+ failsafe-parent
3.2.2-SNAPSHOT
- Failsafe
+ Failsafe Parent
https://failsafe.dev
@@ -35,6 +30,12 @@
https://github.com/failsafe-lib/failsafe
+
+ core
+ examples
+ modules/okhttp
+
+
@@ -55,26 +56,6 @@
0.4.4
test
-
-
-
- io.reactivex
- rxjava
- 1.0.12
- test
-
-
- io.netty
- netty-all
- 4.1.51.Final
- test
-
-
- io.vertx
- vertx-core
- 3.9.8
- test
-