From 972900eea96d0377acd4154e730bacaab28aa243 Mon Sep 17 00:00:00 2001 From: yoyama Date: Wed, 30 Nov 2022 10:17:11 +0900 Subject: [PATCH 1/3] WM-1654 enhance require> to avoid unexpected failure with max running attempt (#51) * WM-1654 Fix require> with max running attempts * Add test for require> with max attempt --- .../core/workflow/WorkflowExecutor.java | 9 ++- .../src/test/java/acceptance/RequireIT.java | 71 +++++++++++++++++++ .../acceptance/require/child_wait_long.dig | 3 + .../acceptance/require/parent_wait_long.dig | 3 + 4 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig create mode 100644 digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 4bc1387fc6..931a3de51d 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -297,9 +297,6 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar SessionStore ss = sm.getSessionStore(siteId); long activeAttempts = ss.getActiveAttemptCount(); - if (activeAttempts + 1 > limits.maxAttempts()) { - throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts); - } stored = ss // putAndLockSession + insertAttempt might be able to be faster by combining them into one method and optimize using a single SQL with CTE @@ -327,6 +324,12 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar return storedAttemptWithSession; }); + + if (activeAttempts + 1 > limits.maxAttempts()) { + tm.reset(); + throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts); + } + } catch (WorkflowTaskLimitExceededException ex) { throw ex.getCause(); diff --git a/digdag-tests/src/test/java/acceptance/RequireIT.java b/digdag-tests/src/test/java/acceptance/RequireIT.java index 638fb1bda4..49df8b0913 100644 --- a/digdag-tests/src/test/java/acceptance/RequireIT.java +++ b/digdag-tests/src/test/java/acceptance/RequireIT.java @@ -12,6 +12,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import utils.CommandStatus; import utils.TemporaryDigdagServer; @@ -43,6 +45,8 @@ public class RequireIT { + private static Logger logger = LoggerFactory.getLogger(RequireIT.class); + @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -560,4 +564,71 @@ public void testTargetSessionIDWhenNoKick(boolean waitChildFinish) assertThat(requireOperatorStateParams.get("target_attempt_id", Id.class), is(targetAttemptId)); assertThat(requireOperatorStateParams.get("target_session_id", Id.class), is(targetSessionId)); } + + @Test + public void testRunningHitMaxAttempt() + throws Exception { + // Only two attempts are able to run. + // In this situation, both parent_wait_long and child_wait_long must run successfully + try { + server.close(); + server = TemporaryDigdagServer.builder() + .configuration("executor.attempt_max_run = 2") + .build(); + server.start(); + + // Create a new project + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig")); + copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_wait_long", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + // Wait for the attempt to complete + boolean success = false; + for (int i = 0; i < 120; i++) { + CommandStatus attemptsStatus = main("attempts", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + String statusStr = attemptsStatus.outUtf8(); + if (statusStr.contains("status: success")) { + success = true; + break; + } else if (statusStr.contains("status: error")) { + success = false; + logger.error("attempt failed: {}", statusStr); + break; + } + Thread.sleep(1000); + } + assertThat(success, is(true)); + } + finally { + if (server != null) { + server.close(); + } + } + } } diff --git a/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig b/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig new file mode 100644 index 0000000000..8dac70a465 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig @@ -0,0 +1,3 @@ ++wait: + wait>: 30s + diff --git a/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig b/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig new file mode 100644 index 0000000000..d23c31bd58 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig @@ -0,0 +1,3 @@ ++require: + require>: child_wait_long + From 5c4496e0b54a7a851c95123dcb89583ce3767d2d Mon Sep 17 00:00:00 2001 From: yoyama Date: Fri, 6 Jan 2023 16:52:29 +0900 Subject: [PATCH 2/3] WM-1655 support delay kick of `require>` with resource limit. (#57) WM-1655 support delay kick of `require>` with resource limit. --- .../core/agent/RequireOperatorFactory.java | 6 +- .../src/test/java/acceptance/RequireIT.java | 84 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java index bff1809f92..86308d149d 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java @@ -23,12 +23,15 @@ import java.time.Instant; import java.util.UUID; +import static java.lang.Math.abs; import static java.util.Locale.ENGLISH; public class RequireOperatorFactory implements OperatorFactory { private static final int MAX_TASK_RETRY_INTERVAL = 10; + // ToDo configurable in server config to run test solidly + private static final int DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT = 60 * 10; // 10 min. private static Logger logger = LoggerFactory.getLogger(RequireOperatorFactory.class); @@ -142,7 +145,8 @@ public TaskResult runTask() projectIdentifier.transform(ProjectIdentifier::toString).or(""), workflowName)); } catch (ResourceLimitExceededException ex) { - throw new TaskExecutionException(ex); + logger.warn("Number of attempts or tasks exceed limit. Retry {} seconds later", DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT); + throw TaskExecutionException.ofNextPolling(DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT, ConfigElement.copyOf(lastStateParams)); } } diff --git a/digdag-tests/src/test/java/acceptance/RequireIT.java b/digdag-tests/src/test/java/acceptance/RequireIT.java index 49df8b0913..c00256213f 100644 --- a/digdag-tests/src/test/java/acceptance/RequireIT.java +++ b/digdag-tests/src/test/java/acceptance/RequireIT.java @@ -631,4 +631,88 @@ public void testRunningHitMaxAttempt() } } } + + @Test + public void testDelayWithMaxAttempt() + throws Exception { + // Scenario: + // Only two attempts are able to run. + // One attempt running firstly and run for some duration (reuse 'child_wait_long') + // Then create new attempt which has `require>` (kick 'parent_wait_long') + // Expected: + // The new attempt keep running. The task of `require>` will be retried after 10 min + // The way to confirm: + // To reduce the time of test, only check attempt log shows "Number of attempts or tasks exceed limit" + // + try { + server.close(); + server = TemporaryDigdagServer.builder() + .configuration("executor.attempt_max_run = 2") + .build(); + server.start(); + + // Create a new project + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig")); + copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + { // create an attempt + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "child_wait_long", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + } + + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_wait_long", + "--session", "2022-12-01 12:34:56"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + { + CommandStatus status = main("attempts", + "-c", config.toString(), + "-e", server.endpoint()); + logger.info("{}", status.outUtf8()); + } + + String logStr = ""; + for (int i = 0; i < 60; i++) { + CommandStatus attemptLog = main("log", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + logStr = attemptLog.outUtf8(); + if (logStr.contains("Number of attempts or tasks exceed limit")) { + return; // OK + } + Thread.sleep(1000); + } + fail("Cannot confirm retry happen. log:\n" + logStr); + } + finally { + if (server != null) { + server.close(); + } + } + } } From 2ab7b7b1ea330958580da47b6edec3d2d536f0c1 Mon Sep 17 00:00:00 2001 From: yoyama Date: Wed, 18 Jan 2023 12:15:32 +0900 Subject: [PATCH 3/3] WM-1260 Fix require> with multiple attempts (#65) Fix WM-1260 require> issue with multiple attempts --- .../database/DatabaseSessionStoreManager.java | 34 +++++--- .../src/test/java/acceptance/RequireIT.java | 80 +++++++++++++++++-- .../acceptance/require/parent_multiple.dig | 15 ++++ 3 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig diff --git a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java index 35c8f8309d..08639f8b68 100644 --- a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java +++ b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java @@ -1562,12 +1562,23 @@ public StoredSessionAttempt insertAttempt(long sessionId, int projId, SessionAtt throws ResourceConflictException, ResourceNotFoundException { long attemptId = catchForeignKeyNotFound(() -> - catchConflict(() -> - dao.insertAttempt(siteId, projId, sessionId, - attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(), - AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams()), - "session attempt name=%s in session id=%d", attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId), - "workflow definition id=%d", attempt.getWorkflowDefinitionId().orNull()); + catchConflict( + () -> { + // select id from sessions where id = for update + return dao.insertAttempt( + siteId, + projId, + sessionId, + attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(), + AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams() + ); + }, + "session attempt name=%s in session id=%d", + attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId + ), + "workflow definition id=%d", + attempt.getWorkflowDefinitionId().orNull() + ); dao.updateLastAttemptId(sessionId, attemptId); try { return requiredResource( @@ -1660,7 +1671,8 @@ public T putAndLockSession(Session session, SessionLockAction func) // select first so that conflicting insert (postgresql) or foreign key constraint violation (h2) // doesn't increment sequence of primary key unnecessarily - storedSession = dao.getSessionByConflictedNamesInternal( + // the session must be locked same as `dao.upsertAndLockSession()` + storedSession = dao.getAndLockSessionByConflictedNamesInternal( session.getProjectId(), session.getWorkflowName(), session.getSessionTime().getEpochSecond()); @@ -1676,7 +1688,7 @@ public T putAndLockSession(Session session, SessionLockAction func) return 0; }, "project id=%d", session.getProjectId()); - storedSession = dao.getSessionByConflictedNamesInternal( + storedSession = dao.getAndLockSessionByConflictedNamesInternal( session.getProjectId(), session.getWorkflowName(), session.getSessionTime().getEpochSecond()); @@ -2082,9 +2094,9 @@ List getAttemptsOfWorkflowWithRetries( " where project_id = :projectId" + " and workflow_name = :workflowName" + " and session_time = :sessionTime" + - " limit 1") // here allows last_attempt_id == NULL - StoredSession getSessionByConflictedNamesInternal(@Bind("projectId") int projectId, - @Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime); + " limit 1 for update") // here allows last_attempt_id == NULL + StoredSession getAndLockSessionByConflictedNamesInternal(@Bind("projectId") int projectId, + @Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime); @SqlQuery("select session_time from sessions" + " where project_id = :projectId" + diff --git a/digdag-tests/src/test/java/acceptance/RequireIT.java b/digdag-tests/src/test/java/acceptance/RequireIT.java index c00256213f..be54e28b7a 100644 --- a/digdag-tests/src/test/java/acceptance/RequireIT.java +++ b/digdag-tests/src/test/java/acceptance/RequireIT.java @@ -235,13 +235,15 @@ public void testIgnoreProjectIdParam() @Test public void testRequireToAnotherProjectById() - throws Exception { + throws Exception + { testRequireToAnotherProject(true, "parent_by_id", "2020-06-05 00:00:01"); } @Test public void testRequireToAnotherProjectByName() - throws Exception { + throws Exception + { testRequireToAnotherProject(false, "parent_by_name", "2020-06-05 00:00:02"); } @@ -254,7 +256,8 @@ public void testRequireToAnotherProjectByName() * @throws Exception */ private void testRequireToAnotherProject(boolean useProjectId, String parentProjectName, String sessionTime) - throws Exception { + throws Exception + { final String childProjectName = "child_another"; // Push child project @@ -412,7 +415,10 @@ private RestSessionAttemptCollection testRerunOnParam(String sessionTime, String return attempts; } - private static boolean isAttemptSuccess(CommandStatus status) { return status.outUtf8().contains("status: success"); } + private static boolean isAttemptSuccess(CommandStatus status) + { + return status.outUtf8().contains("status: success"); + } private CommandStatus startAndWait(String... args) throws InterruptedException { @@ -567,7 +573,8 @@ public void testTargetSessionIDWhenNoKick(boolean waitChildFinish) @Test public void testRunningHitMaxAttempt() - throws Exception { + throws Exception + { // Only two attempts are able to run. // In this situation, both parent_wait_long and child_wait_long must run successfully try { @@ -634,7 +641,8 @@ public void testRunningHitMaxAttempt() @Test public void testDelayWithMaxAttempt() - throws Exception { + throws Exception + { // Scenario: // Only two attempts are able to run. // One attempt running firstly and run for some duration (reuse 'child_wait_long') @@ -715,4 +723,64 @@ public void testDelayWithMaxAttempt() } } } + + @Test + public void testMultipleAttemptsInSession() + throws Exception + { + // Check the issue that 'require>' will fail when run multiple attempts in a session in very short term + // Scenario: + // run "parent_multiple" for a specific session + // the workflow run tasks as follows: + // - firstly run `child_wait` for a session + // - run multiple attempts of `child_wait` for the session in parallel. + // Way to check: + // all attempts finish successfully + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_multiple.dig", projectDir.resolve("parent_multiple.dig")); + copyResource("acceptance/require/child_wait.dig", projectDir.resolve("child_wait.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_multiple", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + // Wait for the attempt to complete + boolean success = false; + for (int i = 0; i < 180; i++) { + CommandStatus attemptsStatus = main("attempts", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + String statusStr = attemptsStatus.outUtf8(); + if (statusStr.contains("status: success")) { + success = true; + break; + } else if (statusStr.contains("status: error")) { + success = false; + logger.error("attempt failed: {}", statusStr); + break; + } + Thread.sleep(1000); + } + assertThat(success, is(true)); + } } diff --git a/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig b/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig new file mode 100644 index 0000000000..c96793d2c2 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig @@ -0,0 +1,15 @@ ++t1: + require>: child_wait + session_time: ${session_time} + ++t2: + loop>: 20 + _parallel: true + _do: + +t2_1: + require>: child_wait + rerun_on: all + session_time: ${session_time} + ++t3: + echo>: "Finished successfully: ${session_time}"