Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Run Remote Improvements #17

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions wwpdb/utils/dp/RcsbDpUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@
from wwpdb.utils.config.ConfigInfo import ConfigInfo
from wwpdb.utils.config.ConfigInfoApp import ConfigInfoAppEm, ConfigInfoAppCommon
from wwpdb.utils.dp.PdbxStripCategory import PdbxStripCategory
from wwpdb.utils.dp.RunRemote import RunRemote
from wwpdb.utils.dp.RunRemote import RunRemoteFlow
import prefect
from prefect import Flow,Parameter
from prefect.executors import LocalExecutor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -3990,9 +3993,9 @@ def __run(self, command, lPathFull, op):
if self.__run_remote:
random_suffix = random.randrange(9999999)
job_name = '{}_{}'.format(op, random_suffix)
return RunRemote(command=command, job_name=job_name, log_dir=os.path.dirname(lPathFull),
timeout=self.__timeout, number_of_processors=self.__numThreads,
memory_limit=self.__startingMemory).run()
lPathFull = os.path.dirname(lPathFull)
rrf = RunRemoteFlow(job_name, command, lPathFull, self.__timeout, self.__numThreads, self.__startingMemory)
return rrf.run()

if self.__timeout > 0:
return self.__runTimeout(command, self.__timeout, lPathFull)
Expand Down
211 changes: 186 additions & 25 deletions wwpdb/utils/dp/RunRemote.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import subprocess
import argparse
from wwpdb.utils.config.ConfigInfo import ConfigInfo, getSiteId
import prefect
from prefect import task, Flow,Parameter
from prefect.tasks.mysql import mysql

logger = logging.getLogger()

Expand All @@ -22,7 +25,7 @@ def __init__(self, command, job_name, log_dir, timeout=5600, memory_limit=0, num
self.number_of_processors = number_of_processors
self.job_name = job_name
self.log_dir = log_dir
self.memory_used = 0
self.max_memory_used = 0
self.memory_unit = 'MB'
self.bsub_exit_status = 0
self.siteId = getSiteId()
Expand All @@ -38,8 +41,17 @@ def __init__(self, command, job_name, log_dir, timeout=5600, memory_limit=0, num
self.bsub_out_file = os.path.join(self.log_dir, self.job_name + '.out')
self.add_site_config = add_site_config
self.add_site_config_database = add_site_config_database
self.executed_command = ''
self.out = None
self.err = None
self.run_duration = None
self.ret_code = 1
self.bsub_try = 1
self.avg_memory_used = 0
self.max_threads = 0
self.max_processes = 0
self.cpu_time = 0
self.job_time = 0

def escape_substitution(self, command):
"""
Expand All @@ -49,21 +61,21 @@ def escape_substitution(self, command):
return command

def run(self):
rc = 1
self.ret_code = 1

if self.add_site_config_database:
self.pre_pend_sourcing_site_config(database=True)
if self.add_site_config:
self.pre_pend_sourcing_site_config()

if self.bsub_run_command:
bsub_try = 1
rc, self.out, self.err = self.run_bsub()
self.bsub_try = 1
self.ret_code, self.out, self.err = self.run_bsub()
while self.bsub_exit_status != 0:
if self.memory_used:
if self.max_memory_used:
try:
if self.memory_used > self.memory_limit:
self.memory_limit = int(self.memory_used)
if self.max_memory_used > self.memory_limit:
self.memory_limit = int(self.max_memory_used)
except:
pass

Expand All @@ -73,18 +85,18 @@ def run(self):
self.memory_limit = self.memory_limit + 30000
else:
self.memory_limit = self.memory_limit + 10000
bsub_try += 1
logging.info('try {}, memory {}'.format(bsub_try, self.memory_limit))
rc, self.out, self.err = self.run_bsub()
self.bsub_try += 1
logging.info('try {}, memory {}'.format(self.bsub_try, self.memory_limit))
self.ret_code, self.out, self.err = self.run_bsub()

if rc != 0:
logging.error('return code: {}'.format(rc))
if self.ret_code != 0:
logging.error('return code: {}'.format(self.ret_code))
logging.error('out: {}'.format(self.out))
logging.error('error: {}'.format(self.err))
else:
logging.info('worked')

return rc
return self.ret_code

@staticmethod
def check_timing(t1, t2):
Expand Down Expand Up @@ -207,6 +219,7 @@ def launch_bsub(self):
bsub_command.append("'")

command_string = ' '.join(bsub_command)
self.executed_command = command_string
rc, out, err = self.run_command(command=command_string)

return rc, out, err
Expand All @@ -226,34 +239,77 @@ def launch_bsub_wait_process(self):

return rc, out, err

def normalise_mem_units(self, memory, memory_unit):
if memory_unit == 'GB':
memory = memory * 1024
elif memory_unit == 'KB':
memory = int(memory / 1024)
return memory

def parse_bsub_log(self):
self.bsub_exit_status = 0
self.memory_used = 0
self.memory_unit = 'MB'
self.max_memory_used = 0
if os.path.exists(self.bsub_log_file):
with open(self.bsub_log_file, 'r') as log_file:
for l in log_file:
if 'Max Memory :' in l:
try:
memory_used = l.split(':')[-1].strip()
self.memory_unit = memory_used.split(' ')[1]
self.memory_used = int(memory_used.split(' ')[0])
memory_unit = memory_used.split(' ')[1]
max_memory_used = int(memory_used.split(' ')[0])
self.max_memory_used = self.normalise_mem_units(max_memory_used, memory_unit)
except Exception as e:
logging.error(e)
continue

if 'Average Memory :' in l:
try:
memory_used = l.split(':')[-1].strip()
memory_unit = memory_used.split(' ')[1]
avg_memory_used = int(memory_used.split(' ')[0])
self.avg_memory_used = self.normalise_mem_units(avg_memory_used, memory_unit)
except Exception as e:
logging.error(e)
continue

if 'CPU time :' in l:
try:
cpu_time = l.split(':')[-1].strip()
self.cpu_time = float(cpu_time.split()[0])
except Exception as e:
logging.error(e)
continue

if 'Run time :' in l:
try:
job_time = l.split(':')[-1].strip()
self.job_time = float(job_time.split()[0])
except Exception as e:
logging.error(e)
continue

if 'Max Threads :' in l:
try:
self.max_threads = int(l.split(':')[-1].strip())
except Exception as e:
logging.error(e)
continue

if 'Max Processes :' in l:
try:
self.max_processes = int(l.split(':')[-1].strip())
except Exception as e:
logging.error(e)
continue


if 'TERM_MEMLIMIT' in l:
self.bsub_exit_status = 1
if self.memory_unit == 'GB':
self.memory_unit = 'MB'
self.memory_used = self.memory_used * 1024
elif self.memory_unit == 'KB':
self.memory_unit = 'MB'
self.memory_used = int(self.memory_used / 1024)
logging.info('memory used: {} {}'.format(self.memory_used, self.memory_unit))

logging.info('Max memory used: {} {}'.format(self.max_memory_used, self.memory_unit))
logging.info('bsub exit status: {}'.format(self.bsub_exit_status))

def run_bsub(self):

if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)

Expand Down Expand Up @@ -287,6 +343,111 @@ def run_bsub(self):

return rc, out, err

class RunRemoteFlow():

def __init__(self, job_name, command, lPathFull, timeout, num_threads, starting_memory):
self.job_name = job_name
self.command = command
self.lPathFull = lPathFull
self.timeout = timeout
self.num_threads = num_threads
self.starting_memory = starting_memory
self.flow_id = 'efcd4cd5-dadc-451f-9df6-32c8114e431e' # Group Flow ID for RunRemotely


def register_flow(self):
job_name = Parameter('job_name', default = "Default Job")
command = Parameter('command', default = "ls")
lPathFull = Parameter('lPathFull', default = "/tmp")
timeout = Parameter('timeout', default = 5600)
numThreads = Parameter('number_of_processors', default= 1)
startingMemory = Parameter('memory_limit', 0)

with Flow('Run Remotely') as flow:
rr = run_remote_task(command=command, job_name=job_name, log_dir=lPathFull,
timeout=timeout, number_of_processors=numThreads,
memory_limit=startingMemory)
state = save_remote_run_detail(rr)

# self.flow_id = flow.register("Remote Running")
self.flow_id = flow.register("Remote Running", idempotency_key=flow.serialized_hash())

def run_flow(self):
client = prefect.Client()
params= {
'job_name' : self.job_name,
'command' : self.command,
'lPathFull' : self.lPathFull,
'timeout' : self.timeout,
'number_of_processors' : self.num_threads,
'memory_limit' : self.starting_memory
}
flow_run_id = client.create_flow_run(flow_id=self.flow_id, parameters=params, run_name=self.job_name)
state = client.get_flow_run_state(flow_run_id )

# We are making this a blocking call
while not state.is_finished():
state = client.get_flow_run_state(flow_run_id )
time.sleep(1)

if state.is_successful():
return 0
else:
return 1

def run(self):
self.register_flow()
exit_code = self.run_flow()
return exit_code

@task
def run_remote_task(command, job_name, log_dir, timeout, number_of_processors,memory_limit):
logger = prefect.context.get("logger")
logger.info(f"Job: {job_name}")
logger.info(f"Command to execute: {command}")
rr = RunRemote(command=command, job_name=job_name, log_dir=log_dir,
timeout=timeout, number_of_processors=number_of_processors,
memory_limit=memory_limit)
start_time = time.time()
rr.ret_code = rr.run()
try:
logger.info(f"Executed: {rr.executed_command}")
logger.info(f"Remote Task Ended | Exit Code: {rr.ret_code}")
logger.info(f'BSUB Log path: {rr.bsub_log_file}')
end_time = time.time()
rr.run_duration = end_time - start_time
logger.info(f"Duration: {rr.run_duration} seconds")
logger.info(f"Memory Used: {rr.max_memory_used} MB")
logger.info(f"Number of Processors: {rr.number_of_processors}")
except Exception as e:
logger.info(e)
return rr


@task
def save_remote_run_detail(rr):
db_Host = rr.cI.get("SITE_DB_HOST_NAME")
db_Name = rr.cI.get("SITE_DB_DATABASE_NAME")
db_User = rr.cI.get("SITE_DB_USER_NAME")
db_Pw = rr.cI.get("SITE_DB_PASSWORD")
db_Port = int(rr.cI.get("SITE_DB_PORT_NUMBER"))
if rr.memory_limit == 0:
rr.memory_limit = "NULL"
else:
rr.memory_limit = f"'{str(rr.memory_limit)}'"
# Need to add Depositin ID,, correct no.of process, threads, cpu time
insertion_query= f'''INSERT into run_statistics
(job_name, memory_requested, max_memory_used, avg_memory_used, exit_status, attempts, number_of_processor,
max_threads, max_processes, job_time, cpu_time, total_time)
VALUES ('{rr.job_name}', {rr.memory_limit}, '{rr.max_memory_used}', '{rr.avg_memory_used}',
'{rr.ret_code}', '{rr.bsub_try}','{rr.number_of_processors}','{rr.max_threads}', '{rr.max_processes}',
'{rr.job_time}', '{rr.cpu_time}', '{rr.run_duration}')
'''
logger.info(insertion_query)
db_task = mysql.MySQLExecute(db_Name, db_User, db_Pw, db_Host, port=db_Port, commit=True)
db_task.run(query=insertion_query)
return rr.ret_code


if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand Down