Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements of require> #1815

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.time.Instant;
import java.util.UUID;

import static java.lang.Math.abs;
import static java.util.Locale.ENGLISH;

public class RequireOperatorFactory
implements OperatorFactory
{
private static final int MAX_TASK_RETRY_INTERVAL = 10;
// ToDo configurable in server config to run test solidly
private static final int DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT = 60 * 10; // 10 min.

private static Logger logger = LoggerFactory.getLogger(RequireOperatorFactory.class);

Expand Down Expand Up @@ -142,7 +145,8 @@ public TaskResult runTask()
projectIdentifier.transform(ProjectIdentifier::toString).or(""), workflowName));
}
catch (ResourceLimitExceededException ex) {
throw new TaskExecutionException(ex);
logger.warn("Number of attempts or tasks exceed limit. Retry {} seconds later", DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT);
throw TaskExecutionException.ofNextPolling(DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT, ConfigElement.copyOf(lastStateParams));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,12 +1562,23 @@ public StoredSessionAttempt insertAttempt(long sessionId, int projId, SessionAtt
throws ResourceConflictException, ResourceNotFoundException
{
long attemptId = catchForeignKeyNotFound(() ->
catchConflict(() ->
dao.insertAttempt(siteId, projId, sessionId,
attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(),
AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams()),
"session attempt name=%s in session id=%d", attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId),
"workflow definition id=%d", attempt.getWorkflowDefinitionId().orNull());
catchConflict(
() -> {
// select id from sessions where id = <sessionId> for update
return dao.insertAttempt(
siteId,
projId,
sessionId,
attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(),
AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams()
);
},
"session attempt name=%s in session id=%d",
attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId
),
"workflow definition id=%d",
attempt.getWorkflowDefinitionId().orNull()
);
dao.updateLastAttemptId(sessionId, attemptId);
try {
return requiredResource(
Expand Down Expand Up @@ -1660,7 +1671,8 @@ public <T> T putAndLockSession(Session session, SessionLockAction<T> func)

// select first so that conflicting insert (postgresql) or foreign key constraint violation (h2)
// doesn't increment sequence of primary key unnecessarily
storedSession = dao.getSessionByConflictedNamesInternal(
// the session must be locked same as `dao.upsertAndLockSession()`
storedSession = dao.getAndLockSessionByConflictedNamesInternal(
session.getProjectId(),
session.getWorkflowName(),
session.getSessionTime().getEpochSecond());
Expand All @@ -1676,7 +1688,7 @@ public <T> T putAndLockSession(Session session, SessionLockAction<T> func)
return 0;
},
"project id=%d", session.getProjectId());
storedSession = dao.getSessionByConflictedNamesInternal(
storedSession = dao.getAndLockSessionByConflictedNamesInternal(
session.getProjectId(),
session.getWorkflowName(),
session.getSessionTime().getEpochSecond());
Expand Down Expand Up @@ -2082,9 +2094,9 @@ List<StoredSessionAttemptWithSession> getAttemptsOfWorkflowWithRetries(
" where project_id = :projectId" +
" and workflow_name = :workflowName" +
" and session_time = :sessionTime" +
" limit 1") // here allows last_attempt_id == NULL
StoredSession getSessionByConflictedNamesInternal(@Bind("projectId") int projectId,
@Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime);
" limit 1 for update") // here allows last_attempt_id == NULL
StoredSession getAndLockSessionByConflictedNamesInternal(@Bind("projectId") int projectId,
@Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime);

@SqlQuery("select session_time from sessions" +
" where project_id = :projectId" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar
SessionStore ss = sm.getSessionStore(siteId);

long activeAttempts = ss.getActiveAttemptCount();
if (activeAttempts + 1 > limits.maxAttempts()) {
throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts);
}

stored = ss
// putAndLockSession + insertAttempt might be able to be faster by combining them into one method and optimize using a single SQL with CTE
Expand Down Expand Up @@ -327,6 +324,12 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar

return storedAttemptWithSession;
});

if (activeAttempts + 1 > limits.maxAttempts()) {
tm.reset();
throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts);
}

}
catch (WorkflowTaskLimitExceededException ex) {
throw ex.getCause();
Expand Down
231 changes: 227 additions & 4 deletions digdag-tests/src/test/java/acceptance/RequireIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.CommandStatus;
import utils.TemporaryDigdagServer;

Expand Down Expand Up @@ -43,6 +45,8 @@

public class RequireIT
{
private static Logger logger = LoggerFactory.getLogger(RequireIT.class);

@Rule
public TemporaryFolder folder = new TemporaryFolder();

Expand Down Expand Up @@ -231,13 +235,15 @@ public void testIgnoreProjectIdParam()

@Test
public void testRequireToAnotherProjectById()
throws Exception {
throws Exception
{
testRequireToAnotherProject(true, "parent_by_id", "2020-06-05 00:00:01");
}

@Test
public void testRequireToAnotherProjectByName()
throws Exception {
throws Exception
{
testRequireToAnotherProject(false, "parent_by_name", "2020-06-05 00:00:02");
}

Expand All @@ -250,7 +256,8 @@ public void testRequireToAnotherProjectByName()
* @throws Exception
*/
private void testRequireToAnotherProject(boolean useProjectId, String parentProjectName, String sessionTime)
throws Exception {
throws Exception
{
final String childProjectName = "child_another";

// Push child project
Expand Down Expand Up @@ -408,7 +415,10 @@ private RestSessionAttemptCollection testRerunOnParam(String sessionTime, String
return attempts;
}

private static boolean isAttemptSuccess(CommandStatus status) { return status.outUtf8().contains("status: success"); }
private static boolean isAttemptSuccess(CommandStatus status)
{
return status.outUtf8().contains("status: success");
}

private CommandStatus startAndWait(String... args) throws InterruptedException
{
Expand Down Expand Up @@ -560,4 +570,217 @@ public void testTargetSessionIDWhenNoKick(boolean waitChildFinish)
assertThat(requireOperatorStateParams.get("target_attempt_id", Id.class), is(targetAttemptId));
assertThat(requireOperatorStateParams.get("target_session_id", Id.class), is(targetSessionId));
}

@Test
public void testRunningHitMaxAttempt()
throws Exception
{
// Only two attempts are able to run.
// In this situation, both parent_wait_long and child_wait_long must run successfully
try {
server.close();
server = TemporaryDigdagServer.builder()
.configuration("executor.attempt_max_run = 2")
.build();
server.start();

// Create a new project
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.errUtf8(), initStatus.code(), is(0));

copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig"));
copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig"));

// Push the project
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"require",
"-c", config.toString(),
"-e", server.endpoint(),
"-r", "4711");
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
Id attemptId;
{
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"require", "parent_wait_long",
"--session", "now");
assertThat(startStatus.code(), is(0));
attemptId = getAttemptId(startStatus);
}

// Wait for the attempt to complete
boolean success = false;
for (int i = 0; i < 120; i++) {
CommandStatus attemptsStatus = main("attempts",
"-c", config.toString(),
"-e", server.endpoint(),
attemptId.toString());
String statusStr = attemptsStatus.outUtf8();
if (statusStr.contains("status: success")) {
success = true;
break;
} else if (statusStr.contains("status: error")) {
success = false;
logger.error("attempt failed: {}", statusStr);
break;
}
Thread.sleep(1000);
}
assertThat(success, is(true));
}
finally {
if (server != null) {
server.close();
}
}
}

@Test
public void testDelayWithMaxAttempt()
throws Exception
{
// Scenario:
// Only two attempts are able to run.
// One attempt running firstly and run for some duration (reuse 'child_wait_long')
// Then create new attempt which has `require>` (kick 'parent_wait_long')
// Expected:
// The new attempt keep running. The task of `require>` will be retried after 10 min
// The way to confirm:
// To reduce the time of test, only check attempt log shows "Number of attempts or tasks exceed limit"
//
try {
server.close();
server = TemporaryDigdagServer.builder()
.configuration("executor.attempt_max_run = 2")
.build();
server.start();

// Create a new project
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.errUtf8(), initStatus.code(), is(0));

copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig"));
copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig"));

// Push the project
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"require",
"-c", config.toString(),
"-e", server.endpoint(),
"-r", "4711");
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
{ // create an attempt
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"require", "child_wait_long",
"--session", "now");
assertThat(startStatus.code(), is(0));
}

Id attemptId;
{
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"require", "parent_wait_long",
"--session", "2022-12-01 12:34:56");
assertThat(startStatus.code(), is(0));
attemptId = getAttemptId(startStatus);
}

{
CommandStatus status = main("attempts",
"-c", config.toString(),
"-e", server.endpoint());
logger.info("{}", status.outUtf8());
}

String logStr = "";
for (int i = 0; i < 60; i++) {
CommandStatus attemptLog = main("log",
"-c", config.toString(),
"-e", server.endpoint(),
attemptId.toString());
logStr = attemptLog.outUtf8();
if (logStr.contains("Number of attempts or tasks exceed limit")) {
return; // OK
}
Thread.sleep(1000);
}
fail("Cannot confirm retry happen. log:\n" + logStr);
}
finally {
if (server != null) {
server.close();
}
}
}

@Test
public void testMultipleAttemptsInSession()
throws Exception
{
// Check the issue that 'require>' will fail when run multiple attempts in a session in very short term
// Scenario:
// run "parent_multiple" for a specific session
// the workflow run tasks as follows:
// - firstly run `child_wait` for a session
// - run multiple attempts of `child_wait` for the session in parallel.
// Way to check:
// all attempts finish successfully
CommandStatus initStatus = main("init",
"-c", config.toString(),
projectDir.toString());
assertThat(initStatus.errUtf8(), initStatus.code(), is(0));

copyResource("acceptance/require/parent_multiple.dig", projectDir.resolve("parent_multiple.dig"));
copyResource("acceptance/require/child_wait.dig", projectDir.resolve("child_wait.dig"));

// Push the project
CommandStatus pushStatus = main("push",
"--project", projectDir.toString(),
"require",
"-c", config.toString(),
"-e", server.endpoint(),
"-r", "4711");
assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0));
Id attemptId;
{
CommandStatus startStatus = main("start",
"-c", config.toString(),
"-e", server.endpoint(),
"require", "parent_multiple",
"--session", "now");
assertThat(startStatus.code(), is(0));
attemptId = getAttemptId(startStatus);
}

// Wait for the attempt to complete
boolean success = false;
for (int i = 0; i < 180; i++) {
CommandStatus attemptsStatus = main("attempts",
"-c", config.toString(),
"-e", server.endpoint(),
attemptId.toString());
String statusStr = attemptsStatus.outUtf8();
if (statusStr.contains("status: success")) {
success = true;
break;
} else if (statusStr.contains("status: error")) {
success = false;
logger.error("attempt failed: {}", statusStr);
break;
}
Thread.sleep(1000);
}
assertThat(success, is(true));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 30s

Loading