Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job archive interface: clean up a couple helper functions #460

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 35 additions & 58 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ def add_job_records(rows):
job_records = []

for row in rows:
rset = ResourceSet(row[6]) # fetch R
try:
# attempt to create a ResourceSet from R
rset = ResourceSet(row[6])
nnodes = rset.nnodes
except (ValueError, TypeError):
# can't convert R to a ResourceSet object; skip it
continue

job_record = JobRecord(
row[0], # userid
Expand All @@ -132,7 +138,7 @@ def add_job_records(rows):
row[2], # t_submit
row[3], # t_run
row[4], # t_inactive
rset.nnodes, # nnodes
nnodes, # nnodes
row[6], # resources
)
job_records.append(job_record)
Expand All @@ -150,99 +156,70 @@ def check_jobspec(jobspec, bank):
)


# we are looking for jobs that were submitted under a secondary bank, so we'll
# only add jobs that have the same bank name attribute in the jobspec
def sec_bank_jobs(job_records, bank):
# Filter job records based on the specified bank. For a default bank,
# it includes jobs that either specify the default bank or do not
# specify any bank at all.
def filter_jobs_by_bank(job_records, bank, is_default_bank=False):
jobs = []
for job in job_records:
jobspec = json.loads(job[7])

if check_jobspec(jobspec, bank):
jobs.append(job)

return jobs


# we are looking for jobs that were submitted under a default bank, which has
# two cases: 1) the user submitted a job while specifying their default bank,
# or 2) the user submitted a job without specifying any bank at all
def def_bank_jobs(job_records, default_bank):
jobs = []
for job in job_records:
jobspec = json.loads(job[7])

if check_jobspec(jobspec, default_bank):
jobs.append(job)
elif "bank" not in jobspec["attributes"]["system"]:
elif is_default_bank and "bank" not in jobspec["attributes"]["system"]:
jobs.append(job)

return jobs


def get_job_records(conn, bank, default_bank, **kwargs):
job_records = []

# find out which args were passed and place them in a dict
valid_params = ("user", "after_start_time", "before_end_time", "jobid")
params = {}
params_list = []

valid_params = {"user", "after_start_time", "before_end_time", "jobid"}
params = {
key: val
for (key, val) in kwargs.items()
for key, val in kwargs.items()
if val is not None and key in valid_params
}

select_stmt = (
"SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs "
)
where_stmt = ""

def append_to_where(where_stmt, conditional):
if where_stmt != "":
return "{} AND {} ".format(where_stmt, conditional)

return "WHERE {}".format(conditional)
select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs"
where_clauses = []
params_list = []

# generate the SELECT statement based on the parameters passed in
if "user" in params:
params["user"] = get_uid(params["user"])
where_clauses.append("userid = ?")
params_list.append(params["user"])
where_stmt = append_to_where(where_stmt, "userid=? ")
if "after_start_time" in params:
where_clauses.append("t_run > ?")
params_list.append(params["after_start_time"])
where_stmt = append_to_where(where_stmt, "t_run > ? ")
if "before_end_time" in params:
where_clauses.append("t_inactive < ?")
params_list.append(params["before_end_time"])
where_stmt = append_to_where(where_stmt, "t_inactive < ? ")
if "jobid" in params:
where_clauses.append("id = ?")
params_list.append(params["jobid"])
where_stmt = append_to_where(where_stmt, "id=? ")

select_stmt += where_stmt
if where_clauses:
select_stmt += " WHERE " + " AND ".join(where_clauses)

cur = conn.cursor()
cur.execute(select_stmt, (*tuple(params_list),))
cur.execute(select_stmt, tuple(params_list))
result = cur.fetchall()
# if the length of dataframe is 0, that means no job records were found
# in the jobs table, so just return an empty list
if len(result) == 0:
return job_records

if not result:
return []

if bank is None and default_bank is None:
# special case for unit tests in test_job_archive_interface.py
job_records = add_job_records(result)

return job_records
return add_job_records(result)

if bank != default_bank:
jobs = sec_bank_jobs(result, bank)
else:
jobs = def_bank_jobs(result, default_bank)
# find out if we are fetching jobs from a user's default bank or under
# one of their secondary banks; this will determine how we filter the
# job records we've found
is_default_bank = bank == default_bank
jobs = filter_jobs_by_bank(result, bank, is_default_bank)

job_records = add_job_records(jobs)

return job_records
return add_job_records(jobs)


def output_job_records(conn, output_file, **kwargs):
Expand Down
Loading