Skip to content

Commit

Permalink
Merge pull request #98 from seung-lab/heartbeat
Browse files Browse the repository at this point in the history
Heartbeat based on network traffic

Seggraph tasks in the synaptor workflow can generate 10MB/s uploading traffic with minimal cpu usage. Abuse memory monitor to detect high net IO and heartbeat for the worker to keep it alive.
  • Loading branch information
ranlu authored Jul 2, 2024
2 parents 9bb6072 + a4c352d commit d30f9ed
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cloud/google/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def GenerateWorkers(context, hostname_manager, hostname_nfs_server, worker):

docker_image = worker.get('workerImage', context.properties['seuronImage'])

oom_canary_cmd = GenerateDockerCommand(docker_image, docker_env) + ' ' + "python utils/memory_monitor.py ${AIRFLOW__CELERY__BROKER_URL} bot-message-queue >& /dev/null"
oom_canary_cmd = GenerateDockerCommand(docker_image, docker_env + ['--network host']) + ' ' + "python utils/memory_monitor.py ${AIRFLOW__CELERY__BROKER_URL} bot-message-queue >& /dev/null"

if worker['type'] == 'gpu':
cmd = GenerateCeleryWorkerCommand(docker_image, docker_env+['-p 8793:8793'], queue=worker['type'], concurrency=worker['concurrency'])
Expand Down
13 changes: 12 additions & 1 deletion utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,18 @@ def run_oom_canary():
if cpu_usage > 20:
logging.info(f"{cpu_usage}% cpu used, heartbeat")
redis_conn.set(hostname, datetime.now().timestamp())
sleep(1)
continue
counters_start = psutil.net_io_counters()
sleep(1)
counters_end = psutil.net_io_counters()
download_speed = counters_end.bytes_recv - counters_start.bytes_recv
upload_speed = counters_end.bytes_sent - counters_start.bytes_sent
if download_speed > 1e6 or upload_speed > 1e6:
logging.info(f"Significant network IO: {download_speed/1e6}MB/s, {upload_speed/1e6}MB/s, heartbeat")
redis_conn.set(hostname, datetime.now().timestamp())
continue
else:
sleep(1)
else:
sleep(t)

Expand Down

0 comments on commit d30f9ed

Please sign in to comment.