Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/monitoring fixes #27

Merged
merged 10 commits into from
Jan 15, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ It allows users to:
This enables the creation of automatic workflows, timely triggered as events are notified.


The documentation can be found at https://pyaviso.readthedocs.io/
The documentation can be found at https://pyaviso.readthedocs.io/ \
Examples demonstrating the usage of aviso for ECMWF notifications can be found at https://github.com/ecmwf/aviso-examples
2 changes: 2 additions & 0 deletions aviso-server/monitoring/aviso_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _create_default_config() -> Dict:
"frequency": 4, # min
"member_urls": ["http://localhost:2379"],
"req_timeout": 60, # s
"req_mem_count": 1,
"tlms": {"etcd_store_size": {}, "etcd_cluster_status": {}, "etcd_error_log": {}},
}

Expand Down Expand Up @@ -227,6 +228,7 @@ def etcd_reporter(self, etcd_reporter):
if isinstance(e["enabled"], str):
e["enabled"] = e["enabled"].casefold() == "true".casefold()
assert e.get("frequency") is not None, "etcd_reporter frequency has not been configured"
assert e.get("req_mem_count") is not None, "etcd_reporter req_mem_count has not been configured"
assert e.get("member_urls") is not None, "etcd_reporter member_urls has not been configured"
assert e.get("req_timeout") is not None, "etcd_reporter req_timeout has not been configured"
self._etcd_reporter = e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, config, *args, **kwargs):
self.req_timeout = self.etcd_config["req_timeout"]
self.member_urls = self.etcd_config["member_urls"]
self.tlms = self.etcd_config["tlms"]
self.req_mem_count = self.etcd_config["req_mem_count"]
super().__init__(config, *args, **kwargs)

def process_messages(self):
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(self, tlm_type, req_timeout=60, *args, **kwargs):
self.member_urls = kwargs["member_urls"]
self.raw_tlms = kwargs["raw_tlms"]
self.msg_receiver = kwargs["msg_receiver"]
self.req_mem_count = kwargs["req_mem_count"]

def metric(self):
pass
Expand Down Expand Up @@ -151,8 +153,9 @@ def metric(self):
message = "Cluster status is nominal"

# first retrieve the member size
cluster_size = self.cluster_size(self.member_urls[0]) # any of the member should give the same info
if cluster_size != len(self.member_urls):
cluster_size = self.cluster_size(self.member_urls[0])
if cluster_size != self.req_mem_count:
logger.debug(f"cluster size: {cluster_size}, required: {self.req_mem_count}, url: {self.member_urls[0]}")
status = 2
if cluster_size:
message = f"Cluster size is {cluster_size}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,36 +147,37 @@ def aggregate_time_tlms(tlms):
if len(tlms) == 0:
return None

# read only the telemetry field of the tlm
r_tlms = list(map(lambda t: t.get("telemetry"), tlms))

# determine tlm_type
first_key = list(r_tlms[0].keys())[0]
tlm_type = first_key[: first_key.rfind("_")]

# setup the aggregated tlm to return
agg_tlm = {
tlm_type + "_counter": 0,
tlm_type + "_avg": 0,
tlm_type + "_max": -math.inf,
tlm_type + "_min": math.inf,
}
summation = 0
for tlm in r_tlms:
agg_tlm[tlm_type + "_counter"] += tlm[tlm_type + "_counter"]
agg_tlm[tlm_type + "_max"] = (
tlm[tlm_type + "_max"]
if tlm[tlm_type + "_max"] > agg_tlm[tlm_type + "_max"]
else agg_tlm[tlm_type + "_max"]
)
agg_tlm[tlm_type + "_min"] = (
tlm[tlm_type + "_min"]
if tlm[tlm_type + "_min"] < agg_tlm[tlm_type + "_min"]
else agg_tlm[tlm_type + "_min"]
)
summation += tlm[tlm_type + "_counter"] * tlm[tlm_type + "_avg"]
agg_tlm[tlm_type + "_avg"] = summation / agg_tlm[tlm_type + "_counter"]
logger.debug(f"tlms: {tlms}")

# Initialize the aggregated telemetry
agg_tlm = {}
summation = {}

for tlm in tlms:
r_tlm = tlm.get("telemetry")
for key in r_tlm.keys():
tlm_type = key[: key.rfind("_")]

# Initialize if tlm_type is new
if tlm_type not in agg_tlm:
agg_tlm[tlm_type + "_counter"] = 0
agg_tlm[tlm_type + "_avg"] = 0
agg_tlm[tlm_type + "_max"] = -math.inf
agg_tlm[tlm_type + "_min"] = math.inf
summation[tlm_type] = 0

agg_tlm[tlm_type + "_counter"] += r_tlm[tlm_type + "_counter"]
agg_tlm[tlm_type + "_max"] = max(agg_tlm[tlm_type + "_max"], r_tlm[tlm_type + "_max"])
agg_tlm[tlm_type + "_min"] = min(agg_tlm[tlm_type + "_min"], r_tlm[tlm_type + "_min"])
summation[tlm_type] += r_tlm[tlm_type + "_counter"] * r_tlm[tlm_type + "_avg"]

for tlm_type in summation:
if agg_tlm[tlm_type + "_counter"] > 0:
agg_tlm[tlm_type + "_avg"] = summation[tlm_type] / agg_tlm[tlm_type + "_counter"]
else:
agg_tlm[tlm_type + "_avg"] = 0

logger.debug(f"agg_tlm: {agg_tlm}")
return agg_tlm

def aggregate_unique_counter_tlms(tlms):
Expand All @@ -192,25 +193,28 @@ def aggregate_unique_counter_tlms(tlms):
if len(tlms) == 0:
return None

# read only the telemetry field of the tlm
r_tlms = list(map(lambda t: t.get("telemetry"), tlms))
agg_tlms = {}

# determine tlm_type
first_key = list(r_tlms[0].keys())[0]
tlm_type = first_key[: first_key.rfind("_")]
for tlm in tlms:
telemetry_data = tlm.get("telemetry", {})
for key, values in telemetry_data.items():
if key.endswith("_values"):
tlm_type = key[: key.rfind("_")]

# create a unique list of values
aggr_values = []
for tlm in r_tlms:
for v in tlm[tlm_type + "_values"]:
if v not in aggr_values:
aggr_values.append(v)
if tlm_type not in agg_tlms:
agg_tlms[tlm_type] = {
tlm_type + "_counter": 0,
tlm_type + "_values": set(),
}

agg_tlm = {
tlm_type + "_counter": len(aggr_values),
tlm_type + "_values": aggr_values,
}
return agg_tlm
agg_tlms[tlm_type][tlm_type + "_values"].update(values)

# Convert sets to lists and update counters
for tlm_type, data in agg_tlms.items():
data[tlm_type + "_values"] = list(data[tlm_type + "_values"])
data[tlm_type + "_counter"] = len(data[tlm_type + "_values"])

return agg_tlms

@classmethod
def retrieve_metrics(cls, metric_servers, req_timeout):
Expand Down
Loading