Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizing WorkflowExecutor.enqueueReadyTasks #1752

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ public void run()
int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks;
// Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately.
int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
int actualyAcquired = 0;
if (maxAcquire > 0) {
metrics.summary(Category.AGENT,"mtag_NumMaxAcquire", maxAcquire);
transactionManager.begin(() -> {
actualyAcquired = transactionManager.begin(() -> {
List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000);
for (TaskRequest req : reqs) {
executor.submit(() -> {
Expand All @@ -144,10 +145,10 @@ public void run()
});
activeTaskCount.incrementAndGet();
}
return null;
return reqs.size();
});
}
else {
if (actualyAcquired == 0) {
metrics.increment(Category.AGENT, "mtag_RunWaitCounter");
// no executor thread is available. sleep for a while until a task execution finishes
addActiveTaskLock.wait(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,61 @@ public boolean isAnyNotDoneAttempts()
);
}

private boolean isPostgres()
{
switch (databaseType) {
case "postgresql":
return true;
default:
return false;
}
}

@DigdagTimed(value = "dssm_", category = "db", appendMethodName = true)
@Override
public List<Long> findAllReadyTaskIds(int maxEntries, boolean randomFetch)
public <T> Optional<T> tryLockReadyTask(TaskLockActionWithDetails<T> func)
{
if (randomFetch) {
return autoCommit((handle, dao) -> dao.findAllTaskIdsByStateAtRandom(TaskStateCode.READY.get(), maxEntries));
}
else {
return autoCommit((handle, dao) -> dao.findAllTaskIdsByState(TaskStateCode.READY.get(), maxEntries));
}
return transaction((handle, dao) -> {
if (isPostgres()) {
// FOR UPDATE with SKIP LOCKED is available only with PostgreSQL.
// Optimize two SELECTs into one SELECT with SKIP LOCKED.
StoredTask locked = handle.createQuery(
selectTaskDetailsQuery() +
" where state = " + TaskStateCode.READY.get() +
" limit 1" +
" for update of t skip locked"
)
.map(stm)
.first();

if (locked != null) {
T result = func.call(new DatabaseTaskControlStore(handle), locked);
return Optional.of(result);
}
return Optional.<T>absent();

} else {
// H2 doesn't support FOR UPDATE with JOIN. Use two SELECTs
Long lockedTaskId = handle.createQuery(
"select id from tasks" +
" where state = " + TaskStateCode.READY.get() +
" limit 1 for update")
.mapTo(Long.class)
.first();
if (lockedTaskId != null) {
try {
StoredTask locked = getTaskById(handle, lockedTaskId); // this doesn't need
// FOR UPDATE clause because the row is already locked above
T result = func.call(new DatabaseTaskControlStore(handle), locked);
return Optional.of(result);
}
catch (ResourceNotFoundException ex) {
// Never reach here because above query blocks DELETE
}
}
return Optional.<T>absent();
}
});
}

@DigdagTimed(value = "dssm_", category = "db", appendMethodName = true)
Expand Down Expand Up @@ -1695,10 +1740,6 @@ void upsertAndLockSession(@Bind("projectId") int projectId,
" where id = :id" +
" for update")
Long lockTaskIfNotLocked(@Bind("id") long taskId);

@SqlQuery("select id from tasks where state = :state order by random() limit :limit")
List<Long> findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit);

}

@UseStringTemplate3StatementLocator
Expand Down Expand Up @@ -1744,9 +1785,6 @@ StoredSession upsertAndLockSession(@Bind("projectId") int projectId,
" where id = :id" +
" for update skip locked")
Long lockTaskIfNotLocked(@Bind("id") long taskId);

@SqlQuery("select id from tasks where state = :state order by random() limit :limit")
List<Long> findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit);
}

public interface Dao
Expand Down Expand Up @@ -2043,11 +2081,6 @@ StoredSession getSessionByConflictedNamesInternal(@Bind("projectId") int project
@GetGeneratedKeys
long insertSessionMonitor(@Bind("attemptId") long attemptId, @Bind("nextRunTime") long nextRunTime, @Bind("type") String type, @Bind("config") Config config);

@SqlQuery("select id from tasks where state = :state limit :limit")
List<Long> findAllTaskIdsByState(@Bind("state") short state, @Bind("limit") int limit);

List<Long> findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit);

@SqlQuery("select id, session_id, state_flags, index from session_attempts where id = :attemptId for update")
SessionAttemptSummary lockAttempt(@Bind("attemptId") long attemptId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ AttemptStateFlags getAttemptStateFlags(long attemptId)
// for WorkflowExecutor.runUntilAny
boolean isAnyNotDoneAttempts();

// for WorkflowExecutor.enqueueReadyTasks (Keep for compatibility)
default List<Long> findAllReadyTaskIds(int maxEntries) { return findAllReadyTaskIds(maxEntries, false); }

/**
* for WorkflowExecutor.enqueueReadyTasks
* @param maxEntries max number to fetch
* @param randomFetch fetch randomly or not(original behavior)
* @return
*/
List<Long> findAllReadyTaskIds(int maxEntries, boolean randomFetch);
<T> Optional<T> tryLockReadyTask(TaskLockActionWithDetails<T> func);


// for AttemptTimeoutEnforcer.enforceAttemptTTLs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.digdag.core.session.StoredTask;
import io.digdag.core.session.Task;
import io.digdag.core.session.TaskAttemptSummary;
import io.digdag.core.session.TaskControlStore;
import io.digdag.core.session.TaskStateCode;
import io.digdag.core.session.TaskStateFlags;
import io.digdag.metrics.DigdagTimed;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -178,8 +180,7 @@ public class WorkflowExecutor
private final Lock propagatorLock = new ReentrantLock();
private final Condition propagatorCondition = propagatorLock.newCondition();
private volatile boolean propagatorNotice = false;
private final boolean enqueueRandomFetch;
private final Integer enqueueFetchSize;
private final int bulkEnqueueSize;

@Inject
public WorkflowExecutor(
Expand All @@ -204,8 +205,7 @@ public WorkflowExecutor(
this.systemConfig = systemConfig;
this.limits = limits;
this.metrics = metrics;
this.enqueueRandomFetch = systemConfig.get("executor.enqueue_random_fetch", Boolean.class, false);
this.enqueueFetchSize = systemConfig.get("executor.enqueue_fetch_size", Integer.class, 100);
this.bulkEnqueueSize = systemConfig.get("executor.bulk_enqueue_size", Integer.class, 100);
}

public StoredSessionAttemptWithSession submitWorkflow(int siteId,
Expand Down Expand Up @@ -500,7 +500,7 @@ public void runWhile(BooleanSupplier cond)
try (TaskQueuer queuer = new TaskQueuer()) {
propagateBlockedChildrenToReady();
retryRetryWaitingTasks();
enqueueReadyTasks(queuer); // TODO enqueue all (not only first 100)
enqueueReadyTasks(queuer);
propagateAllPlannedToDone();
propagateSessionArchive();

Expand All @@ -519,7 +519,10 @@ public void runWhile(BooleanSupplier cond)

propagateBlockedChildrenToReady();
retryRetryWaitingTasks();
enqueueReadyTasks(queuer);

// enqueue at most bulkEnqueueSize tasks. It may leave some
// ready tasks on the database not to be enqueued.
boolean hasMoreReadyTasks = enqueueReadyTasks(queuer);

/**
* propagateSessionArchive() should be always called.
Expand All @@ -531,7 +534,7 @@ public void runWhile(BooleanSupplier cond)
*/
boolean hasModification = propagateAllPlannedToDone();
propagateSessionArchive();
if (hasModification) {
if (hasMoreReadyTasks || hasModification) {
//propagateSessionArchive();
}
else {
Expand Down Expand Up @@ -928,101 +931,98 @@ public void close()
//}
}

@VisibleForTesting
protected Function<Long, Boolean> funcEnqueueTask()
{
return (tId) ->
tm.begin(() -> {
enqueueTask(dispatcher, tId);
return true;
});
}

@DigdagTimed(category = "executor", appendMethodName = true)
protected void enqueueReadyTasks(TaskQueuer queuer)
protected boolean enqueueReadyTasks(TaskQueuer queuer)
{
List<Long> readyTaskIds = tm.begin(() -> sm.findAllReadyTaskIds(enqueueFetchSize, enqueueRandomFetch));
logger.trace("readyTaskIds:{}", readyTaskIds);
for (long taskId : readyTaskIds) { // TODO randomize this result to achieve concurrency
catching(()->funcEnqueueTask().apply(taskId), true, "Failed to call enqueueTask. taskId:" + taskId);
//queuer.asyncEnqueueTask(taskId); // TODO async queuing is probably unnecessary but not sure
for (int i = 0; i < bulkEnqueueSize; i++) {
boolean changed = tm.begin(() -> sm.tryLockReadyTask((store, task) -> {
catching(()->enqueueLockedTask(dispatcher, store, task), true, "Failed to call enqueueTask. taskId:" + task.getId());
return true;
})).or(false);
if (!changed) {
// Break and return false if no tasks are ready
return false;
}
}
return true; // Run next enqueueReadyTasks immediately without sleep at runWhile
}

@DigdagTimed(category="executor", appendMethodName = true)
protected void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId)
@VisibleForTesting
protected boolean enqueueLockedTask(final TaskQueueDispatcher dispatcher,
final TaskControlStore store, final StoredTask task)
{
sm.lockTaskIfNotLocked(taskId, (store, task) -> {
TaskControl lockedTask = new TaskControl(store, task, limits);
if (lockedTask.getState() != TaskStateCode.READY) {
return false;
}

if (task.getTaskType().isGroupingOnly()) {
return retryGroupingTask(lockedTask);
}
final long taskId = task.getId();

// NOTE: Nothing to do here because CANCEL_REQUESTED task will be handled by an agent.
// See also state transitions.

int siteId;
try {
siteId = sm.getSiteIdOfTask(taskId);
}
catch (ResourceNotFoundException ex) {
tm.reset();
Exception error = new IllegalStateException("Task id="+taskId+" is ready to run but associated session attempt does not exist.", ex);
logger.error("Database state error enqueuing task.", error);
return false;
}
TaskControl lockedTask = new TaskControl(store, task, limits);
if (lockedTask.getState() != TaskStateCode.READY) {
return false;
}

try {
// TODO make queue name configurable. note that it also needs a new REST API and/or
// CLI ccommands to create/delete/manage queues.
Optional<String> queueName = Optional.absent();
if (task.getTaskType().isGroupingOnly()) {
return retryGroupingTask(lockedTask);
}

String encodedUnique = encodeUniqueQueuedTaskName(lockedTask.get());
// NOTE: Nothing to do here because CANCEL_REQUESTED task will be handled by an agent.
// See also state transitions.

TaskQueueRequest request = TaskQueueRequest.builder()
.priority(0) // TODO make this configurable
.uniqueName(encodedUnique)
.data(Optional.absent())
.build();
int siteId;
try {
siteId = sm.getSiteIdOfTask(taskId);
}
catch (ResourceNotFoundException ex) {
tm.reset();
Exception error = new IllegalStateException("Task id="+taskId+" is ready to run but associated session attempt does not exist.", ex);
logger.error("Database state error enqueuing task.", error);
return false;
}

logger.debug("Queuing task of attempt_id={}: id={} {}", task.getAttemptId(), task.getId(), task.getFullName());
try {
dispatcher.dispatch(siteId, queueName, request);
}
catch (TaskConflictException ex) {
tm.reset();
logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing",
encodedUnique, queueName.or("<shared>"), siteId);
}
try {
// TODO make queue name configurable. note that it also needs a new REST API and/or
// CLI ccommands to create/delete/manage queues.
Optional<String> queueName = Optional.absent();

////
// don't throw exceptions after here. task is already dispatched to a queue
//
String encodedUnique = encodeUniqueQueuedTaskName(lockedTask.get());

boolean updated = lockedTask.setReadyToRunning();
if (!updated) {
// return value of setReadyToRunning must be true because this task is locked
// (won't be updated by other machines concurrently) and confirmed that
// current state is READY.
logger.warn("Unexpected state change failure from READY to RUNNING: {}", task);
}
TaskQueueRequest request = TaskQueueRequest.builder()
.priority(0) // TODO make this configurable
.uniqueName(encodedUnique)
.data(Optional.absent())
.build();

return updated;
logger.debug("Queuing task of attempt_id={}: id={} {}", task.getAttemptId(), task.getId(), task.getFullName());
try {
dispatcher.dispatch(siteId, queueName, request);
}
catch (Exception ex) {
catch (TaskConflictException ex) {
tm.reset();
logger.error(
LogMarkers.UNEXPECTED_SERVER_ERROR,
"Enqueue error, making this task failed: {}", task, ex);
// TODO retry here?
return taskFailed(lockedTask,
buildExceptionErrorConfig(ex).toConfig(cf));
logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing",
encodedUnique, queueName.or("<shared>"), siteId);
}

////
// don't throw exceptions after here. task is already dispatched to a queue
//

boolean updated = lockedTask.setReadyToRunning();
if (!updated) {
// return value of setReadyToRunning must be true because this task is locked
// (won't be updated by other machines concurrently) and confirmed that
// current state is READY.
logger.warn("Unexpected state change failure from READY to RUNNING: {}", task);
}
}).or(false);

return updated;
}
catch (Exception ex) {
tm.reset();
logger.error(
LogMarkers.UNEXPECTED_SERVER_ERROR,
"Enqueue error, making this task failed: {}", task, ex);
// TODO retry here?
return taskFailed(lockedTask,
buildExceptionErrorConfig(ex).toConfig(cf));
}
}

private static String encodeUniqueQueuedTaskName(StoredTask task)
Expand Down
Loading