Skip to content

Commit

Permalink
add pipeline status monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
nicHoch committed Nov 14, 2023
1 parent f628d3e commit 32b12ad
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ stixcore/data/test/ephemeris/spice/kernels/mk/*.abs
stixcore/data/soop/
stixcore/data/publish/
stixcore/data/test/products/end2end/
monitor_status.json
103 changes: 103 additions & 0 deletions stixcore/processing/pipeline_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import sys
import json
import smtplib
import argparse
import datetime
from pprint import pprint, pformat
from pathlib import Path

from dateutil import parser as dateparser

from stixcore.config.config import CONFIG
from stixcore.processing.pipeline_status import get_status
from stixcore.util.logging import get_logger

__all__ = ['pipeline_monitor']

logger = get_logger(__name__)


def pipeline_monitor(args):
"""Status logger and notification script for the pipeline.
SetUp via cron.
Query the number of open files still to process. Logs that number into a status file
and checks if the the number is constantly equal or increasing.
Sends an notification via mail if a possible pipeline stuck is detected.
"""
parser = argparse.ArgumentParser(description='stix pipeline monitor')
parser.add_argument("-p", "--port",
help="connection port for the status info server",
default=CONFIG.getint('Pipeline', 'status_server_port', fallback=12345),
type=int)

parser.add_argument("-s", "--save_file",
help="file to persist last status",
default="monitor_status.json", type=str)

args = parser.parse_args(args)

ret = get_status("next".encode(), args.port)
open_files = int(ret.replace("open files: ", ""))
save_file = Path(args.save_file)

status = {}
status['last'] = []

if save_file.exists():
with open(save_file, "+r") as f:
try:
status = json.load(f)
except Exception:
pass

status['last'].append({"date": datetime.datetime.now().isoformat(timespec='milliseconds'),
"open": open_files})

status['last'] = status['last'][-9:]

if len(status['last']) == 9 and open_files > 0:
stuck = True
last_open = status['last'][0]
for la in status['last'][1:]:
if la['open'] <= 0 or la['open'] < last_open['open']:
stuck = False
break
last_open = la
if stuck:
fd = dateparser.parse(status['last'][0]['date'])
ld = dateparser.parse(status['last'][-1]['date'])
if (ld - fd).days >= 1:
if CONFIG.getboolean('Publish', 'report_mail_send', fallback=False):
try:
sender = CONFIG.get('Pipeline', 'error_mail_sender', fallback='')
receivers = CONFIG.get('Publish', 'report_mail_receivers').split(",")
host = CONFIG.get('Pipeline', 'error_mail_smpt_host', fallback='localhost')
port = CONFIG.getint('Pipeline', 'error_mail_smpt_port', fallback=25)
smtp_server = smtplib.SMTP(host=host, port=port)
message = f"""Subject: StixCore Pipeline Monitor
Pipeline stuck?
{pformat(status)}
Login to server and check
"""

smtp_server.sendmail(sender, receivers, message)
except Exception as e:
logger.error(f"Error: unable to send monitor email: {e}")

with open(save_file, "w") as f:
json.dump(status, f, indent=4)

pprint(status)


def main():
pipeline_monitor(sys.argv[1:])


if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions stixcore/processing/pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from stixcore.config.config import CONFIG
from stixcore.util.logging import get_logger

__all__ = ['pipeline_status']
__all__ = ['pipeline_status', 'get_status']

logger = get_logger(__name__)

Expand All @@ -28,7 +28,7 @@ def get_status(msg, port=12346):
line = server.readline()
if not line:
break
print(f"{line.decode().rstrip()}")
return f"{line.decode().rstrip()}"

finally:
sock.close()
Expand Down Expand Up @@ -69,7 +69,7 @@ def pipeline_status(args):

cmd = args.cmd.encode() if args.cmd else b'last'

get_status(cmd, args.port)
print(get_status(cmd, args.port))


def main():
Expand Down

0 comments on commit 32b12ad

Please sign in to comment.