Skip to content

Commit

Permalink
Merge pull request #129 from TheRoddyWMS/FixesForJobManagerIssues
Browse files Browse the repository at this point in the history
Some fixes for JobManager issues
  • Loading branch information
dankwart-de authored Dec 3, 2018
2 parents d2e28e6 + 54bff63 commit 4273322
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import de.dkfz.roddy.config.ResourceSet
import de.dkfz.roddy.execution.BEExecutionService
import de.dkfz.roddy.execution.io.ExecutionResult
import groovy.transform.CompileStatic
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.time.Duration
import java.time.LocalDateTime

import java.util.concurrent.TimeoutException

/**
Expand All @@ -26,6 +27,8 @@ import java.util.concurrent.TimeoutException
@CompileStatic
abstract class BatchEuphoriaJobManager<C extends Command> {

final static Logger log = LoggerFactory.getLogger(BatchEuphoriaJobManager)

protected final BEExecutionService executionService

protected boolean isTrackingOfUserJobsEnabled
Expand All @@ -43,13 +46,28 @@ abstract class BatchEuphoriaJobManager<C extends Command> {
private String userAccount

private final Map<BEJobID, BEJob> activeJobs = [:]

private Thread updateDaemonThread

/**
* Set this to true to tell the job manager, that an existing update daemon shall be closed, e.g. because
* the application is in the process to exit.
*/
protected boolean updateDaemonShallBeClosed

/**
* Set this to true, if you do not want to allow any further job submission.
*/
protected boolean forbidFurtherJobSubmission

private Map<BEJobID, JobState> cachedStates = [:]
private final Object cacheStatesLock = new Object()
private LocalDateTime lastCacheUpdate
private Duration cacheUpdateInterval

public boolean surveilledJobsHadErrors = false

private final List<UpdateDaemonListener> updateDaemonListeners = []

boolean requestMemoryIsEnabled
boolean requestWalltimeIsEnabled
Expand Down Expand Up @@ -86,8 +104,17 @@ abstract class BatchEuphoriaJobManager<C extends Command> {
}
}


/**
* If you override this method, make sure to build in the check for further job submission! It is not allowed to
* submit any jobs after waitForJobs() was called.
* @param job
* @return
* @throws TimeoutException
*/
BEJobResult submitJob(BEJob job) throws TimeoutException {
if (forbidFurtherJobSubmission) {
throw new BEException("You are not allowed to submit further jobs. This happens, when you call waitForJobs().")
}
Command command = createCommand(job)
ExecutionResult executionResult = executionService.execute(command)
extractAndSetJobResultFromExecutionResult(command, executionResult)
Expand Down Expand Up @@ -207,20 +234,6 @@ abstract class BatchEuphoriaJobManager<C extends Command> {
}
}

int waitForJobsToFinish() {
if (!updateDaemonThread) {
throw new BEException("createDaemon needs to be enabled for waitForJobsToFinish")
}
while (true) {
synchronized (activeJobs) {
if (activeJobs.isEmpty()) {
return 0
}
}
Thread.sleep(cacheUpdateInterval.toMillis())
}
}

abstract String getJobIdVariable()

abstract String getJobNameVariable()
Expand Down Expand Up @@ -328,29 +341,58 @@ abstract class BatchEuphoriaJobManager<C extends Command> {

protected void createUpdateDaemonThread() {
updateDaemonThread = Thread.startDaemon("Job state update daemon.", {
updateActiveJobList()
try {
Thread.sleep(Math.max(cacheUpdateInterval.toMillis(), 10 * 1000))
} catch (InterruptedException e) {
e.printStackTrace()
while (!updateDaemonShallBeClosed) {
updateActiveJobList()

waitForUpdateIntervalDuration()
}
})
}

final void addUpdateDaemonListener(UpdateDaemonListener listener) {
synchronized (updateDaemonListeners) {
updateDaemonListeners << listener
}
}

boolean isDaemonAlive() {
return updateDaemonThread != null && updateDaemonThread.isAlive()
}

void waitForUpdateIntervalDuration() {
long duration = Math.max(cacheUpdateInterval.toMillis(), 10 * 1000)
// Sleep one second until the duration is reached. This allows the daemon to finish faster, when it shall stop
// (updateDaemonShallBeClosed == true)
for (long timer = duration; timer > 0 && !updateDaemonShallBeClosed; timer -= 1000)
Thread.sleep(1000)
}

void stopUpdateDaemon() {
updateDaemonShallBeClosed = true
updateDaemonThread?.join()
}

private void updateActiveJobList() {
List<BEJobID> listOfRemovableJobs = []
synchronized (activeJobs) {
Map<BEJobID, JobState> states = queryJobStatesUsingCache(activeJobs.keySet() as List<BEJobID>, true)

for (BEJobID id : activeJobs.keySet()) {
JobState js = states.get(id)
JobState jobState = states.get(id)
BEJob job = activeJobs.get(id)

job.setJobState(js)

if (!js.isPlannedOrRunning()) {
activeJobs.remove(id)
job.setJobState(jobState)
if (!jobState.isPlannedOrRunning()) {
synchronized (updateDaemonListeners) {
updateDaemonListeners.each { it.jobEnded(job, jobState) }
}
if (!jobState.successful) {
surveilledJobsHadErrors = true
}
listOfRemovableJobs << id
}
}
listOfRemovableJobs.each { activeJobs.remove(it) }
}
}

Expand All @@ -364,4 +406,27 @@ abstract class BatchEuphoriaJobManager<C extends Command> {
}
return new HashMap(cachedStates)
}

/**
* The method will wait until all started jobs are finished (with or without errors).
*
* Note, that the method does not allow further job submission! As soon, as you call it, you cannot submit jobs!
*
* @return true, if there were NO errors, false, if there were any.
*/
boolean waitForJobsToFinish() {
if (!updateDaemonThread) {
throw new BEException("The job manager must be created with JobManagerOption.createDaemon set to true to make waitForJobsToFinish() work.")
}
forbidFurtherJobSubmission = true
while (!updateDaemonShallBeClosed) {
synchronized (activeJobs) {
if (activeJobs.isEmpty()) {
break
}
}
waitForUpdateIntervalDuration()
}
return !surveilledJobsHadErrors
}
}
2 changes: 1 addition & 1 deletion src/main/groovy/de/dkfz/roddy/execution/jobs/JobState.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isPlannedOrRunning() {
}

public static boolean _isPlannedOrRunning(JobState jobState) {
return Arrays.asList(JobState.RUNNING, JobState.QUEUED, JobState.HOLD, JobState.STARTED, JobState.UNSTARTED).contains(jobState);
return Arrays.asList(JobState.RUNNING, JobState.QUEUED, JobState.HOLD, JobState.SUSPENDED, JobState.STARTED, JobState.UNSTARTED).contains(jobState);
}

public boolean isDummy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2018 German Cancer Research Center (Deutsches Krebsforschungszentrum, DKFZ).
*
* Distributed under the MIT License (license terms are at https://github.com/DKFZ-ODCF/COWorkflowsBasePlugin/LICENSE.txt).
*/
package de.dkfz.roddy.execution.jobs

import groovy.transform.CompileStatic

/**
* Listener interface for ended jobs, if the update daemon is active.
* Use JobManager.addUpdateDaemonListener to add a listener.
*/
@CompileStatic
interface UpdateDaemonListener {
void jobEnded(BEJob job, JobState state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ abstract class ClusterJobManager<C extends Command> extends BatchEuphoriaJobMana
return Duration.parse(String.format("PT%sH%sM%sS", hhmmss))
}

@Override
boolean executesWithoutJobSystem() {
return true
}

@Override
ProcessingParameters convertResourceSet(BEJob job, ResourceSet resourceSet) {
assert resourceSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class DirectSynchronousExecutionJobManager extends BatchEuphoriaJobManager<Direc

@Override
BEJobResult submitJob(BEJob job) {
if (forbidFurtherJobSubmission) {
throw new BEException("You are not allowed to submit further jobs. This happens, when you call waitForJobs().")
}
// Some of the parent jobs are in a bad state!
Command command = createCommand(job)
BEJobResult jobResult
Expand Down

0 comments on commit 4273322

Please sign in to comment.