-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimultaneousjobrunner.py
executable file
·134 lines (120 loc) · 3.59 KB
/
simultaneousjobrunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python
import os
import sys
import string
import subprocess
from logger import Logger
class SimultaneousJobRunner(Logger):
"""
SimultaneousJobRunner runs a list of tasks using a fixed number
of subprocesses. The number of simultaneous subprocesses is determined
by the RunJobs method. New jobs are added to the list using the AddJob
method. This class is derived from Logger, which handles logging
any messages. The main() function also works as a command-line
utility to run a list of jobs read from a file or stdin.
"""
def __init__(self, verbose=False,save_output=False):
#self.jobs = []
#self.pids = {}
#self.pipes = {}
#self.output = {}
self.verbose = verbose
self.save_output = save_output
#self.nextjob=0
self.ClearJobs()
if self.verbose:
self.Log( "In SimultaneousJobRunner.__init__")
def ClearJobs( self ):
"""
Initializes and clears out any previous batch of jobs.
"""
self.jobs = []
self.pids = {}
self.pipes = {}
self.output = {}
self.nextjob=0
def AddJob( self, statement, output_key='' ):
if self.verbose:
self.Log(["Adding job", statement])
self.jobs.append( statement )
def StartJob( self, verbose=False ):
"""Starts the next job on the list."""
if self.nextjob < len(self.jobs):
job = self.jobs[ self.nextjob ]
if self.verbose or verbose:
self.Log( "Starting job %d of %d, %s" % (self.nextjob,len(self.jobs),job))
self.nextjob+=1
# Start the subprocess.
pipe=subprocess.Popen(job,shell=True)
self.pipes[pipe.pid] = pipe
# Save its process id.
self.pids[pipe.pid] = job
def RunJobs( self, numjobs, verbose=False ):
"""Run up to numjobs at a time. As soon as one finishes, start another."""
batchexitstatus=0
# Start initial jobs.
for i in range(0,numjobs):
self.StartJob()
# Wait for a job to finish, and start the next job.
while self.pids:
if self.verbose or verbose:
pids = self.pids.keys()
pids.sort()
self.Log( "Waiting on %d jobs: %s" % (len(self.pids), `pids`))
(pid,jobexitstatus) = os.waitpid(-1,os.P_WAIT)
jobname = self.pids.pop(pid)
if self.verbose or verbose:
self.Log( "Pid %d (%s) finished, exitstatus=%d." % ( pid, jobname, jobexitstatus))
if jobexitstatus != 0:
batchexitstatus = jobexitstatus
try:
self.StartJob(verbose)
except IndexError:
pass
# Confirm that the running pids are really running,
# and start additional jobs as needed.
for pid in self.pids.keys():
try:
os.kill(pid,0)
except OSError:
# That pid is no longer running.
self.pids.pop(pid)
self.Log( "Pid %d (%s) finished, exitstatus unknown." % ( pid, jobname ))
try:
self.output[pid] = self.pipes[pid].read()
except AttributeError:
pass
self.StartJob()
self.jobs = []
self.nextjob=0
self.Log("All jobs complete. Job queue is empty.")
return batchexitstatus == 0
def main():
# Defaults: run 5 simultaneous jobs, reads jobs from stdin.
numjobs=5
ifs=sys.stdin
verbose=False
# Command-line argument processing.
for i in range(1,len(sys.argv)):
if sys.argv[i] == "-v":
verbose=True
if sys.argv[i] == "-n":
try:
numjobs = int(sys.argv[i+1])
except IndexError:
pass
if os.path.exists(sys.argv[i]):
ifs =open(sys.argv[i])
jobs = map(string.strip,ifs.readlines())
if ifs != sys.stdin:
ifs.close()
Logger().Log(["Verbose",verbose,"Numjobs",numjobs])
s = SimultaneousJobRunner(verbose)
for job in jobs:
s.AddJob(job)
if s.RunJobs( numjobs ):
s.Log("All jobs completed successfully.")
else:
s.Log("One or more jobs finished with non-zero exit status.")
if __name__ == "__main__":
main()