From 32dfb1277305d8e56cc29beec0592d2126e615b3 Mon Sep 17 00:00:00 2001 From: Tullio Sebastiani Date: Wed, 14 Aug 2024 16:41:38 +0200 Subject: [PATCH] Push alerts to elastic search (#108) (#118) * add elasticsearch stack to github actions * added kind config to map nodeports and removed portforwarding remove node settings added authentication added auth * push alerts to elastic search push alerts to elastic search elastic env in action nit echo nit test test lint fix * push metrics to elastic search metric structure change added upload_metric * push telemetry to elastic + refactoring remove warning fixed model error added module export from elastic models namespace minor checks minor checks changed method name fix tests changed check on metric type reduced elastic log level float casting fixed telemetry model minor nits test fix poetry.lock * export modules lint * taint constructor update * metrics model refactoring lint * elastic library log level * downgraded elastic search to 7.x fix --------- Signed-off-by: Tullio Sebastiani --- .github/workflows/build.yaml | 42 ++- kind-config.yml | 9 + poetry.lock | 40 +-- pyproject.toml | 3 +- src/krkn_lib/elastic/__init__.py | 1 + src/krkn_lib/elastic/krkn_elastic.py | 250 ++++++++++++++++++ src/krkn_lib/k8s/krkn_kubernetes.py | 13 +- src/krkn_lib/models/elastic/__init__.py | 1 + src/krkn_lib/models/elastic/models.py | 195 ++++++++++++++ src/krkn_lib/models/telemetry/models.py | 51 +++- src/krkn_lib/telemetry/elastic.py | 57 ---- src/krkn_lib/tests/base_test.py | 68 +++++ src/krkn_lib/tests/test_krkn_elastic.py | 167 ++++++++++-- .../tests/test_krkn_elastic_models.py | 171 ++++++++++++ 14 files changed, 953 insertions(+), 115 deletions(-) create mode 100644 kind-config.yml create mode 100644 src/krkn_lib/elastic/__init__.py create mode 100644 src/krkn_lib/elastic/krkn_elastic.py create mode 100644 src/krkn_lib/models/elastic/__init__.py create mode 100644 src/krkn_lib/models/elastic/models.py delete mode 100644 src/krkn_lib/telemetry/elastic.py create mode 100644 src/krkn_lib/tests/test_krkn_elastic_models.py diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b2271c49..241a0d0e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -31,8 +31,9 @@ jobs: helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo add stable https://charts.helm.sh/stable helm repo update - - name: Deploy prometheus & Port Forwarding + - name: Deploy prometheus run: | + # nodePort mapping 30000 -> http://localhost:9090 kubectl create namespace monitoring helm install \ --wait --timeout 360s \ @@ -47,11 +48,36 @@ jobs: --set alertmanager.service.type=NodePort \ --set prometheus-node-exporter.service.nodePort=32001 \ --set prometheus-node-exporter.service.type=NodePort + - name: Deploy Elasticsearch - SELECTOR=`kubectl -n monitoring get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'` - POD_NAME=`kubectl -n monitoring get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'` - kubectl -n monitoring port-forward $POD_NAME 9090:9090 & - sleep 5 + env: + ELASTIC_URL: ${{ vars.ELASTIC_URL }} + ELASTIC_PORT: ${{ vars.ELASTIC_PORT }} + ELASTIC_USER: ${{ vars.ELASTIC_USER }} + ELASTIC_PASSWORD: ${{ vars.ELASTIC_PASSWORD }} + run: | + echo "ELASTIC_URL: ${ELASTIC_URL}" + echo "ELASTIC_PORT: ${ELASTIC_PORT}" + echo "ELASTIC_USER: ${ELASTIC_USER}" + echo "ELASTIC_PASSWORD:${ELASTIC_PASSWORD}" + + + # nodePort mapping 32766 -> http://localhost:9091 + helm install \ + --wait --timeout 360s \ + elasticsearch \ + oci://registry-1.docker.io/bitnamicharts/elasticsearch \ + --set master.masterOnly=false \ + --set master.replicaCount=1 \ + --set data.replicaCount=0 \ + --set coordinating.replicaCount=0 \ + --set ingest.replicaCount=0 \ + --set service.type=NodePort \ + --set service.nodePorts.restAPI=32766 \ + --set security.elasticPassword=test \ + --set security.enabled=true \ + --set image.tag=7.17.23-debian-12-r0 \ + --set security.tls.autoGenerated=true - name: Check out code uses: actions/checkout@v3 - name: Update version number @@ -78,7 +104,11 @@ jobs: BUCKET_NAME: ${{ secrets.BUCKET_NAME }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - ES_SERVER: ${{ secrets.ES_SERVER }} + ELASTIC_URL: ${{ vars.ELASTIC_URL }} + ELASTIC_PORT: ${{ vars.ELASTIC_PORT }} + ELASTIC_USER: ${{ vars.ELASTIC_USER }} + ELASTIC_PASSWORD: ${{ vars.ELASTIC_PASSWORD }} + run: | export TEST_WORKDIR=`pwd`/`date +%s` mkdir $TEST_WORKDIR diff --git a/kind-config.yml b/kind-config.yml new file mode 100644 index 00000000..95f101fa --- /dev/null +++ b/kind-config.yml @@ -0,0 +1,9 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + extraPortMappings: + - containerPort: 30000 + hostPort: 9090 + - containerPort: 32766 + hostPort: 9091 diff --git a/poetry.lock b/poetry.lock index d6d0bd98..cdc01c95 100644 --- a/poetry.lock +++ b/poetry.lock @@ -569,42 +569,44 @@ files = [ ] [[package]] -name = "elastic-transport" -version = "8.13.1" -description = "Transport classes and utilities shared among Python Elastic client libraries" +name = "elasticsearch" +version = "7.13.4" +description = "Python client for Elasticsearch" optional = false -python-versions = ">=3.7" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" files = [ - {file = "elastic_transport-8.13.1-py3-none-any.whl", hash = "sha256:5d4bb6b8e9d74a9c16de274e91a5caf65a3a8d12876f1e99152975e15b2746fe"}, - {file = "elastic_transport-8.13.1.tar.gz", hash = "sha256:16339d392b4bbe86ad00b4bdeecff10edf516d32bc6c16053846625f2c6ea250"}, + {file = "elasticsearch-7.13.4-py2.py3-none-any.whl", hash = "sha256:5920df0ab2630778680376d86bea349dc99860977eec9b6d2bd0860f337313f2"}, + {file = "elasticsearch-7.13.4.tar.gz", hash = "sha256:52dda85f76eeb85ec873bf9ffe0ba6849e544e591f66d4048a5e48016de268e0"}, ] [package.dependencies] certifi = "*" -urllib3 = ">=1.26.2,<3" +urllib3 = ">=1.21.1,<2" [package.extras] -develop = ["aiohttp", "furo", "httpx", "mock", "opentelemetry-api", "opentelemetry-sdk", "orjson", "pytest", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "pytest-mock", "requests", "respx", "sphinx (>2)", "sphinx-autodoc-typehints", "trustme"] +async = ["aiohttp (>=3,<4)"] +develop = ["black", "coverage", "jinja2", "mock", "pytest", "pytest-cov", "pyyaml", "requests (>=2.0.0,<3.0.0)", "sphinx (<1.7)", "sphinx-rtd-theme"] +docs = ["sphinx (<1.7)", "sphinx-rtd-theme"] +requests = ["requests (>=2.4.0,<3.0.0)"] [[package]] -name = "elasticsearch" -version = "8.14.0" +name = "elasticsearch-dsl" +version = "7.4.1" description = "Python client for Elasticsearch" optional = false -python-versions = ">=3.7" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ - {file = "elasticsearch-8.14.0-py3-none-any.whl", hash = "sha256:cef8ef70a81af027f3da74a4f7d9296b390c636903088439087b8262a468c130"}, - {file = "elasticsearch-8.14.0.tar.gz", hash = "sha256:aa2490029dd96f4015b333c1827aa21fd6c0a4d223b00dfb0fe933b8d09a511b"}, + {file = "elasticsearch-dsl-7.4.1.tar.gz", hash = "sha256:07ee9c87dc28cc3cae2daa19401e1e18a172174ad9e5ca67938f752e3902a1d5"}, + {file = "elasticsearch_dsl-7.4.1-py2.py3-none-any.whl", hash = "sha256:97f79239a252be7c4cce554c29e64695d7ef6a4828372316a5e5ff815e7a7498"}, ] [package.dependencies] -elastic-transport = ">=8.13,<9" +elasticsearch = ">=7.0.0,<8.0.0" +python-dateutil = "*" +six = "*" [package.extras] -async = ["aiohttp (>=3,<4)"] -orjson = ["orjson (>=3)"] -requests = ["requests (>=2.4.0,!=2.32.2,<3.0.0)"] -vectorstore-mmr = ["numpy (>=1)", "simsimd (>=3)"] +develop = ["coverage (<5.0.0)", "mock", "pytest (>=3.0.0)", "pytest-cov", "pytest-mock (<3.0.0)", "pytz", "sphinx", "sphinx-rtd-theme"] [[package]] name = "fonttools" @@ -2139,4 +2141,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "caa8730dae9716e7cebe9e798ff9a1bb74f8e4efa879d27304f5b63e1a237d95" +content-hash = "768268ae6a1df7838ea202a089274cd6f4020ff772eb5d317542ebc7c7882d45" diff --git a/pyproject.toml b/pyproject.toml index db22bce3..c067ed93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,8 @@ tzlocal = "5.1" pytz = "^2023.3" PyYAML = "6.0.1" prometheus-api-client = "^0.5.4" -elasticsearch = "8.14.0" +elasticsearch = "7.13.4" +elasticsearch-dsl = "7.4.1" wheel = "^0.42.0" cython = "3.0" diff --git a/src/krkn_lib/elastic/__init__.py b/src/krkn_lib/elastic/__init__.py new file mode 100644 index 00000000..7eec9504 --- /dev/null +++ b/src/krkn_lib/elastic/__init__.py @@ -0,0 +1 @@ +from .krkn_elastic import * # NOQA diff --git a/src/krkn_lib/elastic/krkn_elastic.py b/src/krkn_lib/elastic/krkn_elastic.py new file mode 100644 index 00000000..83e5d8d9 --- /dev/null +++ b/src/krkn_lib/elastic/krkn_elastic.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import datetime +import logging +import time + +import math +import urllib3 +from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch_dsl import Search + +from krkn_lib.models.elastic.models import ( + ElasticAlert, + ElasticMetric, + ElasticChaosRunTelemetry, +) +from krkn_lib.models.telemetry import ChaosRunTelemetry +from krkn_lib.utils.safe_logger import SafeLogger + + +class KrknElastic: + es = None + + def __init__( + self, + safe_logger: SafeLogger, + elastic_url: str, + elastic_port: int = 443, + verify_certs: bool = False, + username: str = None, + password: str = None, + ): + es_logger = logging.getLogger("elasticsearch") + es_logger.setLevel(logging.WARNING) + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + urllib3.disable_warnings(DeprecationWarning) + es_transport_logger = logging.getLogger("elastic_transport.transport") + es_transport_logger.setLevel(logging.CRITICAL) + self.safe_logger = safe_logger + try: + if not elastic_url: + raise Exception("elastic search url is not valid") + if not elastic_port: + raise Exception("elastic port is not valid") + # create Elasticsearch object + + credentials = ( + (username, password) if username and password else None + ) + self.es = Elasticsearch( + f"{elastic_url}:{elastic_port}", + http_auth=credentials, + verify_certs=verify_certs, + ssl_show_warn=False, + ) + except Exception as e: + self.safe_logger.error("Failed to initalize elasticsearch: %s" % e) + raise e + + def upload_data_to_elasticsearch(self, item: dict, index: str = "") -> int: + """uploads captured data in item dictionary to Elasticsearch + + + :param item: the data to post to elastic search + :param index: the elastic search index pattern to post to + + :return: the time taken to post the result, + will be 0 if index and es are blank + """ + + if self.es and index != "": + # Attach to elastic search and attempt index creation + start = time.time() + self.safe_logger.info( + f"Uploading item {item} to index {index} in Elasticsearch" + ) + try: + response = self.es.index(index=index, body=item) + self.safe_logger.info(f"Response back was {response}") + if response["result"] != "created": + self.safe_logger.error( + f"Error trying to create new item in {index}" + ) + return -1 + except Exception as e: + self.safe_logger.error(f"Error trying to create new item: {e}") + return -1 + end = time.time() + elapsed_time = end - start + + # return elapsed time for upload if no issues + return math.ceil(elapsed_time) + return 0 + + def upload_metrics_to_elasticsearch( + self, + run_uuid: str, + raw_data: list[dict[str, str | int | float]], + index: str, + ) -> int: + """ + Saves raw data returned from the Krkn prometheus + client to elastic search as a ElasticMetric object + raw data will be a mixed types dictionary in the format + {"name":"str", "values":[(10, '3.14'), (11,'3.15').....] + + :param run_uuid: the krkn run id + :param raw_data: the mixed type dictionary (will be validated + checking the attributes types ) + :param index: the elasticsearch index where + the object will be stored + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + if not run_uuid: + raise Exception("run uuid cannot be None or empty") + time_start = time.time() + try: + for metric in raw_data: + if ( + isinstance(metric["timestamp"], int) + and isinstance(metric["value"], float) + and isinstance(metric["name"], str) + ): + result = self.push_metric( + ElasticMetric( + run_uuid=run_uuid, + name=metric["name"], + created_at=datetime.datetime.now(), + timestamp=int(metric["timestamp"]), + value=float(metric["value"]), + ), + index, + ) + if result == -1: + self.safe_logger.error( + f"failed to save metric " + f"to elasticsearch : {metric}" + ) + + return int(time.time() - time_start) + except Exception: + return -1 + + def push_alert(self, alert: ElasticAlert, index: str) -> int: + """ + Pushes an ElasticAlert object to elastic + + :param alert: the populated ElasticAlert object + :param index: the index where the ElasticAlert Object + is pushed + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + try: + time_start = time.time() + alert.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def push_metric(self, metric: ElasticMetric, index: str) -> int: + """ + Pushes an ElasticMetric object to elastic + + :param metric: the populated ElasticMetric object + :param index: the index where the ElasticMetric Object + is pushed + :return: the time needed to save if succeeded -1 if failed + """ + if not index: + raise Exception("index cannot be None or empty") + try: + time_start = time.time() + metric.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def push_telemetry(self, telemetry: ChaosRunTelemetry, index: str): + if not index: + raise Exception("index cannot be None or empty") + try: + elastic_chaos = ElasticChaosRunTelemetry(telemetry) + time_start = time.time() + elastic_chaos.save(using=self.es, index=index) + return int(time.time() - time_start) + except Exception: + return -1 + + def search_telemetry(self, run_uuid: str, index: str): + """ + Searches ElasticChaosRunTelemetry by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticChaosRunTelemetry + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ + ElasticChaosRunTelemetry(**hit.to_dict()) for hit in result + ] + except NotFoundError: + return [] + return documents + + def search_alert(self, run_uuid: str, index: str) -> list[ElasticAlert]: + """ + Searches ElasticAlerts by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticAlert + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ElasticAlert(**hit.to_dict()) for hit in result] + except NotFoundError: + return [] + return documents + + def search_metric(self, run_uuid: str, index: str) -> list[ElasticMetric]: + """ + Searches ElasticMetric by run_uuid + :param run_uuid: the Krkn run id to search + :param index: the index where the ElasticAlert + should have been saved + :return: the list of objects retrieved (Empty if nothing + has been found) + """ + try: + search = Search(using=self.es, index=index).filter( + "match", run_uuid=run_uuid + ) + result = search.execute() + documents = [ElasticMetric(**hit.to_dict()) for hit in result] + except NotFoundError: + return [] + return documents diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 6af74065..16d90a9a 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -2065,13 +2065,12 @@ def get_nodes_infos(self) -> (list[NodeInfo], list[Taint]): for node in node_resp.items: node_info = NodeInfo() if node.spec.taints is not None: - for taint in node.spec.taints: - taint = Taint( - node_name=node.metadata.name, - effect=taint.effect, - key=taint.key, - value=taint.value, - ) + for node_taint in node.spec.taints: + taint = Taint() + taint.node_name = node.metadata.name + taint.effect = node_taint.effect + taint.key = node_taint.key + taint.value = node_taint.value taints.append(taint) if instance_type_label in node.metadata.labels.keys(): node_info.instance_type = node.metadata.labels[ diff --git a/src/krkn_lib/models/elastic/__init__.py b/src/krkn_lib/models/elastic/__init__.py new file mode 100644 index 00000000..0d6e2db0 --- /dev/null +++ b/src/krkn_lib/models/elastic/__init__.py @@ -0,0 +1 @@ +from .models import * # NOQA diff --git a/src/krkn_lib/models/elastic/models.py b/src/krkn_lib/models/elastic/models.py new file mode 100644 index 00000000..86c13ac2 --- /dev/null +++ b/src/krkn_lib/models/elastic/models.py @@ -0,0 +1,195 @@ +from elasticsearch_dsl import ( + Keyword, + Text, + Date, + Document, + Float, + Long, + Nested, + InnerDoc, + Integer, +) +import datetime + +from krkn_lib.models.telemetry import ChaosRunTelemetry + + +class ElasticAlert(Document): + run_uuid = Keyword() + severity = Text() + alert = Text() + created_at = Date() + + def __init__( + self, + run_uuid: str = None, + severity: str = None, + alert: str = None, + created_at: datetime = None, + **kwargs, + ): + super().__init__(**kwargs) + self.run_uuid = run_uuid + self.severity = severity + self.alert = alert + self.created_at = created_at + + +class ElasticMetricValue(InnerDoc): + timestamp = Long() + value = Float() + + def __init__(self, timestamp: int, value: float, **kwargs): + super().__init__(**kwargs) + self.timestamp = timestamp + self.value = value + + +class ElasticMetric(Document): + run_uuid = Keyword() + name = Text() + created_at = Date() + timestamp = Long() + value = Float() + + def __init__( + self, + run_uuid: str, + name: str, + created_at: datetime, + timestamp: int, + value: float, + **kwargs, + ): + super().__init__(**kwargs) + self.run_uuid = run_uuid + self.name = name + self.created_at = created_at + self.timestamp = timestamp + self.value = value + + +# Telemetry models + + +class ElasticAffectedPod(InnerDoc): + pod_name = Text(fields={"keyword": Keyword()}) + namespace = Text() + total_recovery_time = Float() + pod_readiness_time = Float() + pod_rescheduling_time = Float() + + +class ElasticPodsStatus(InnerDoc): + recovered = Nested(ElasticAffectedPod, multi=True) + unrecovered = Nested(ElasticAffectedPod, multi=True) + error = Text() + + +class ElasticScenarioParameters(InnerDoc): + pass + + +class ElasticScenarioTelemetry(InnerDoc): + start_timestamp = Float() + end_timestamp = Float() + scenario = Text(fields={"keyword": Keyword()}) + exit_status = Integer() + parameters_base64 = Text() + parameters = Nested(ElasticScenarioParameters) + affected_pods = Nested(ElasticPodsStatus) + + +class ElasticNodeInfo(InnerDoc): + count = Integer() + architecture = Text() + instance_type = Text() + node_type = Text() + kernel_version = Text() + kubelet_version = Text() + os_version = Text() + + +class ElasticTaint(InnerDoc): + key = Text() + value = Text() + effect = Text() + + +class ElasticChaosRunTelemetry(Document): + scenarios = Nested(ElasticScenarioTelemetry, multi=True) + node_summary_infos = Nested(ElasticNodeInfo, multi=True) + node_taints = Nested(ElasticTaint, multi=True) + kubernetes_objects_count = Nested(InnerDoc) + network_plugins = Text(multi=True) + timestamp = Text() + total_node_count = Integer() + cloud_infrastructure = Text() + cloud_type = Text() + run_uuid = Text(fields={"keyword": Keyword()}) + + class Index: + name = "chaos_run_telemetry" + + def __init__( + self, chaos_run_telemetry: ChaosRunTelemetry = None, **kwargs + ): + super().__init__(**kwargs) + # cheap trick to avoid reinventing the wheel :-) + if chaos_run_telemetry is None and kwargs: + chaos_run_telemetry = ChaosRunTelemetry(json_dict=kwargs) + self.scenarios = [ + ElasticScenarioTelemetry( + start_timestamp=sc.start_timestamp, + end_timestamp=sc.end_timestamp, + scenario=sc.scenario, + exit_status=sc.exit_status, + parameters_base64=sc.parameters_base64, + parameters=sc.parameters, + affected_pods=ElasticPodsStatus( + recovered=[ + ElasticAffectedPod( + pod_name=pod.pod_name, + namespace=pod.namespace, + total_recovery_time=pod.total_recovery_time, + pod_readiness_time=pod.pod_readiness_time, + pod_rescheduling_time=pod.pod_rescheduling_time, + ) + for pod in sc.affected_pods.recovered + ], + unrecovered=[ + ElasticAffectedPod( + pod_name=pod.pod_name, namespace=pod.namespace + ) + for pod in sc.affected_pods.unrecovered + ], + error=sc.affected_pods.error, + ), + ) + for sc in chaos_run_telemetry.scenarios + ] + + self.node_summary_infos = [ + ElasticNodeInfo( + count=info.count, + architecture=info.architecture, + instance_type=info.instance_type, + kernel_version=info.kernel_version, + kubelet_version=info.kubelet_version, + os_version=info.os_version, + ) + for info in chaos_run_telemetry.node_summary_infos + ] + self.node_taints = [ + ElasticTaint(key=taint.key, value=taint.value, effect=taint.effect) + for taint in chaos_run_telemetry.node_taints + ] + self.kubernetes_objects_count = ( + chaos_run_telemetry.kubernetes_objects_count + ) + self.network_plugins = chaos_run_telemetry.network_plugins + self.timestamp = chaos_run_telemetry.timestamp + self.total_node_count = chaos_run_telemetry.total_node_count + self.cloud_infrastructure = chaos_run_telemetry.cloud_infrastructure + self.cloud_type = chaos_run_telemetry.cloud_type + self.run_uuid = chaos_run_telemetry.run_uuid diff --git a/src/krkn_lib/models/telemetry/models.py b/src/krkn_lib/models/telemetry/models.py index f7cbfeaa..15ca7e65 100644 --- a/src/krkn_lib/models/telemetry/models.py +++ b/src/krkn_lib/models/telemetry/models.py @@ -50,7 +50,9 @@ def __init__(self, json_object: any = None): self.exit_status = json_object.get("exit_status") self.parameters_base64 = json_object.get("parameters_base64") self.parameters = json_object.get("parameters") - self.affected_pods = PodsStatus(json_object.get("affected_pods")) + self.affected_pods = PodsStatus( + json_object=json_object.get("affected_pods") + ) if ( self.parameters_base64 is not None @@ -105,11 +107,22 @@ class Taint: Taint Value """ + def __init__(self, json_dict: dict = None): + if json_dict is not None: + self.node_name = ( + json_dict["node_name"] if "node_name" in json_dict else None + ) + self.effect = ( + json_dict["effect"] if "effect" in json_dict else None + ) + self.key = json_dict["key"] if "key" in json_dict else None + self.value = json_dict["value"] if "value" in json_dict else None + @dataclass(order=False) class NodeInfo: """ - Cluster node telemetry informations + Cluster node telemetry infos """ count: int = 1 @@ -136,6 +149,36 @@ class NodeInfo: os_version: str = "" "Operating system version" + def __init__(self, json_dict: dict = None): + if json_dict is not None: + self.count = json_dict["count"] if "count" in json_dict else None + self.architecture = ( + json_dict["architecture"] + if "architecture" in json_dict + else None + ) + self.instance_type = ( + json_dict["instance_type"] + if "instance_type" in json_dict + else None + ) + self.node_type = ( + json_dict["node_type"] if "node_type" in json_dict else None + ) + self.kernel_version = ( + json_dict["kernel_version"] + if "kernel_version" in json_dict + else None + ) + self.kubelet_version = ( + json_dict["kubelet_version"] + if "kubelet_version" in json_dict + else None + ) + self.os_version = ( + json_dict["os_version"] if "os_version" in json_dict else None + ) + def __eq__(self, other): if isinstance(other, NodeInfo): return ( @@ -231,9 +274,6 @@ def __init__(self, json_dict: any = None): scenarios = json_dict.get("scenarios") if scenarios is None or isinstance(scenarios, list) is False: raise Exception("scenarios param must be a list of object") - for scenario in scenarios: - scenario_telemetry = ScenarioTelemetry(scenario) - self.scenarios.append(scenario_telemetry) self.scenarios = [ScenarioTelemetry(s) for s in scenarios] @@ -250,6 +290,7 @@ def __init__(self, json_dict: any = None): ) self.network_plugins = json_dict.get("network_plugins") self.run_uuid = json_dict.get("run_uuid") + self.timestamp = json_dict.get("timestamp") def to_json(self) -> str: return json.dumps(self, default=lambda o: o.__dict__, indent=4) diff --git a/src/krkn_lib/telemetry/elastic.py b/src/krkn_lib/telemetry/elastic.py deleted file mode 100644 index 6964ec6e..00000000 --- a/src/krkn_lib/telemetry/elastic.py +++ /dev/null @@ -1,57 +0,0 @@ -import time - -import urllib3 -from elasticsearch import Elasticsearch - -from krkn_lib.utils.safe_logger import SafeLogger - - -class KrknElastic: - es = None - - def __init__(self, safe_logger: SafeLogger, elastic_url: str): - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - urllib3.disable_warnings(DeprecationWarning) - self.safe_logger = safe_logger - try: - # create Elasticsearch object - if elastic_url: - self.es = Elasticsearch(f"{elastic_url}:443") - except Exception as e: - self.safe_logger.error("Failed to initalize elasticsearch: %s" % e) - raise e - - def upload_data_to_elasticsearch(self, item: dict, index: str = ""): - """uploads captured data in item dictionary to Elasticsearch - - - :param item: the data to post to elastic search - :param index: the elastic search index pattern to post to - - :return: the time taken to post the result, - will be 0 if index and es are blank - """ - - if self.es and index != "": - # Attach to elastic search and attempt index creation - start = time.time() - self.safe_logger.info( - f"Uploading item {item} to index {index} in Elasticsearch" - ) - try: - response = self.es.index(index=index, body=item) - self.safe_logger.info(f"Response back was {response}") - if response["result"] != "created": - self.safe_logger.error( - f"Error trying to create new item in {index}" - ) - return -1 - except Exception as e: - self.safe_logger.error(f"Error trying to create new item: {e}") - return -1 - end = time.time() - elapsed_time = end - start - - # return elapsed time for upload if no issues - return elapsed_time - return 0 diff --git a/src/krkn_lib/tests/base_test.py b/src/krkn_lib/tests/base_test.py index 323cdf40..7b739a00 100644 --- a/src/krkn_lib/tests/base_test.py +++ b/src/krkn_lib/tests/base_test.py @@ -1,5 +1,6 @@ import cProfile import logging +import os import random import string import sys @@ -16,6 +17,7 @@ from kubernetes.client.rest import ApiException from requests import ConnectTimeout +from krkn_lib.elastic.krkn_elastic import KrknElastic from krkn_lib.k8s import KrknKubernetes from krkn_lib.ocp import KrknOpenshift from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes @@ -28,10 +30,18 @@ class BaseTest(unittest.TestCase): lib_ocp: KrknOpenshift lib_telemetry_k8s: KrknTelemetryKubernetes lib_telemetry_ocp: KrknTelemetryOpenshift + lib_elastic: KrknElastic pr: cProfile.Profile @classmethod def setUpClass(cls): + cls.lib_elastic = KrknElastic( + SafeLogger(), + os.getenv("ELASTIC_URL"), + int(os.getenv("ELASTIC_PORT")), + username=os.getenv("ELASTIC_USER"), + password=os.getenv("ELASTIC_PASSWORD"), + ) cls.lib_k8s = KrknKubernetes(config.KUBE_CONFIG_DEFAULT_LOCATION) cls.lib_ocp = KrknOpenshift(config.KUBE_CONFIG_DEFAULT_LOCATION) cls.lib_telemetry_k8s = KrknTelemetryKubernetes( @@ -422,3 +432,61 @@ def background_delete_pod(self, pod_name: str, namespace: str): ) thread.daemon = True thread.start() + + def get_ChaosRunTelemetry_json(self, run_uuid: str) -> dict: + example_data = { + "scenarios": [ + { + "start_timestamp": 1628493021.0, + "end_timestamp": 1628496621.0, + "scenario": "example_scenario.yaml", + "exit_status": 0, + "parameters_base64": "", + "parameters": { + "parameter_1": "test", + "parameter_2": "test", + "parameter_3": {"sub_parameter_1": "test"}, + }, + "affected_pods": { + "recovered": [ + { + "pod_name": "pod1", + "namespace": "default", + "total_recovery_time": 10.0, + "pod_readiness_time": 5.0, + "pod_rescheduling_time": 2.0, + } + ], + "unrecovered": [ + {"pod_name": "pod2", "namespace": "default"} + ], + "error": "some error", + }, + } + ], + "node_summary_infos": [ + { + "count": 5, + "architecture": "aarch64", + "instance_type": "m2i.xlarge", + "kernel_version": "5.4.0-66-generic", + "kubelet_version": "v2.1.2", + "os_version": "Linux", + } + ], + "node_taints": [ + { + "key": "node.kubernetes.io/unreachable", + "value": "NoExecute", + "effect": "NoExecute", + } + ], + "kubernetes_objects_count": {"Pod": 5, "Service": 2}, + "network_plugins": ["Calico"], + "timestamp": "2023-05-22T14:55:02Z", + "total_node_count": 3, + "cloud_infrastructure": "AWS", + "cloud_type": "EC2", + "run_uuid": run_uuid, + } + return example_data diff --git a/src/krkn_lib/tests/test_krkn_elastic.py b/src/krkn_lib/tests/test_krkn_elastic.py index 4b3c1326..d9dd93c9 100644 --- a/src/krkn_lib/tests/test_krkn_elastic.py +++ b/src/krkn_lib/tests/test_krkn_elastic.py @@ -1,33 +1,161 @@ import datetime -import os +import time -from krkn_lib.telemetry.elastic import KrknElastic +import uuid + +from krkn_lib.elastic.krkn_elastic import KrknElastic +from krkn_lib.models.elastic.models import ( + ElasticAlert, + ElasticMetric, +) +from krkn_lib.models.telemetry import ChaosRunTelemetry from krkn_lib.tests import BaseTest -from krkn_lib.utils.safe_logger import SafeLogger +from krkn_lib.utils import SafeLogger class TestKrknElastic(BaseTest): - url = os.getenv("ES_SERVER") - safe_logger: SafeLogger = SafeLogger() - def _testupload_correct(self): - elastic = KrknElastic(self.safe_logger, self.url) - time = elastic.upload_data_to_elasticsearch( - {"timestamp": datetime.datetime.now()}, "chaos_test" + def test_push_search_alert(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-alert" + alert_1 = ElasticAlert( + alert="alert_1", + severity="WARNING", + created_at=datetime.datetime.now(), + run_uuid=run_uuid, + ) + alert_2 = ElasticAlert( + alert="alert_2", + severity="ERROR", + created_at=datetime.datetime.now(), + run_uuid=run_uuid, + ) + result = self.lib_elastic.push_alert(alert_1, index) + self.assertNotEqual(result, -1) + result = self.lib_elastic.push_alert(alert_2, index) + self.assertNotEqual(result, -1) + time.sleep(1) + alerts = self.lib_elastic.search_alert(run_uuid, index) + self.assertEqual(len(alerts), 2) + + alert = next(alert for alert in alerts if alert.alert == "alert_1") + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, "WARNING") + + alert = next(alert for alert in alerts if alert.alert == "alert_2") + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, "ERROR") + + def test_push_search_metric(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-metric" + metric_1 = ElasticMetric( + run_uuid=run_uuid, + name="metric_1", + timestamp=100, + value=1.0, + created_at=datetime.datetime.now(), + ) + result = self.lib_elastic.push_metric(metric_1, index) + self.assertNotEqual(result, -1) + time.sleep(1) + metrics = self.lib_elastic.search_metric(run_uuid, index) + self.assertEqual(len(metrics), 1) + metric = next( + metric for metric in metrics if metric.name == "metric_1" + ) + self.assertIsNotNone(metric) + self.assertEqual(metric.value, 1.0) + self.assertEqual(metric.timestamp, 100) + self.assertEqual(metric.run_uuid, run_uuid) + self.assertEqual(metric.name, "metric_1") + + def test_push_search_telemetry(self): + run_uuid = str(uuid.uuid4()) + index = "test-push-telemetry" + example_data = self.get_ChaosRunTelemetry_json(run_uuid) + telemetry = ChaosRunTelemetry(json_dict=example_data) + res = self.lib_elastic.push_telemetry(telemetry, index) + self.assertNotEqual(res, -1) + time.sleep(3) + result = self.lib_elastic.search_telemetry( + run_uuid=run_uuid, index=index + ) + + self.assertEqual(len(result), 1) + + def test_upload_metric_to_elasticsearch(self): + bad_metric_uuid = str(uuid.uuid4()) + good_metric_uuid = str(uuid.uuid4()) + name = f"metric-{self.get_random_string(5)}" + index = "test-upload-metric" + # testing bad metric + self.lib_elastic.upload_metrics_to_elasticsearch( + run_uuid=bad_metric_uuid, + raw_data={ + "name": 1, + "timestamp": "bad", + "value": "bad", + }, + index=index, + ) + + self.assertEqual( + len(self.lib_elastic.search_metric(bad_metric_uuid, index)), 0 ) + self.lib_elastic.upload_metrics_to_elasticsearch( + run_uuid=good_metric_uuid, + raw_data=[{"name": name, "timestamp": 10, "value": 3.14}], + index=index, + ) + time.sleep(1) + metric = self.lib_elastic.search_metric(good_metric_uuid, index) + self.assertEqual(len(metric), 1) + self.assertEqual(metric[0].name, name) + self.assertEqual(metric[0].timestamp, 10) + self.assertEqual(metric[0].value, 3.14) + + def test_search_alert_not_existing(self): + self.assertEqual( + len(self.lib_elastic.search_alert("notexisting", "notexisting")), 0 + ) + + def test_search_metric_not_existing(self): + self.assertEqual( + len(self.lib_elastic.search_metric("notexisting", "notexisting")), + 0, + ) + + def test_search_telemetry_not_existing(self): + self.assertEqual( + len( + self.lib_elastic.search_telemetry("notexisting", "notexisting") + ), + 0, + ) + + def test_upload_correct(self): + timestamp = datetime.datetime.now() + run_uuid = str(uuid.uuid4()) + index = "chaos_test" + time = self.lib_elastic.upload_data_to_elasticsearch( + {"timestamp": timestamp, "run_uuid": run_uuid}, index + ) self.assertGreater(time, 0) - def _testupload_no_index(self): - elastic = KrknElastic(self.safe_logger, self.url) - time = elastic.upload_data_to_elasticsearch( + def test_upload_no_index(self): + time = self.lib_elastic.upload_data_to_elasticsearch( {"timestamp": datetime.datetime.now()}, "" ) self.assertEqual(time, 0) - def _testupload_bad_es_url(self): - elastic = KrknElastic(self.safe_logger, "https://localhost") + def test_upload_bad_es_url(self): + elastic = KrknElastic( + SafeLogger(), + "http://localhost", + ) time = elastic.upload_data_to_elasticsearch( {"timestamp": datetime.datetime.now()}, "chaos_test" ) @@ -36,9 +164,8 @@ def _testupload_bad_es_url(self): def _testupload_blank_es_url(self): es_url = "" - elastic = KrknElastic(self.safe_logger, es_url) - time = elastic.upload_data_to_elasticsearch( - {"timestamp": datetime.datetime.now()}, "chaos_test" - ) - - self.assertEqual(time, 0) + with self.assertRaises(Exception): + _ = KrknElastic( + SafeLogger(), + es_url, + ) diff --git a/src/krkn_lib/tests/test_krkn_elastic_models.py b/src/krkn_lib/tests/test_krkn_elastic_models.py new file mode 100644 index 00000000..f6b70339 --- /dev/null +++ b/src/krkn_lib/tests/test_krkn_elastic_models.py @@ -0,0 +1,171 @@ +import uuid + +from krkn_lib.models.elastic.models import ElasticChaosRunTelemetry +from krkn_lib.models.telemetry import ChaosRunTelemetry +from krkn_lib.tests import BaseTest + + +class TestKrknElasticModels(BaseTest): + + def check_test_ElasticChaosRunTelemetry( + self, elastic_telemetry: ElasticChaosRunTelemetry, run_uuid: str + ): + self.assertEqual(len(elastic_telemetry.scenarios), 1) + # scenarios + self.assertEqual( + elastic_telemetry.scenarios[0].start_timestamp, 1628493021.0 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].end_timestamp, 1628496621.0 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].scenario, "example_scenario.yaml" + ) + self.assertEqual(elastic_telemetry.scenarios[0].exit_status, 0) + self.assertEqual(elastic_telemetry.scenarios[0].parameters_base64, "") + self.assertEqual( + elastic_telemetry.scenarios[0].parameters, + self.get_ChaosRunTelemetry_json(run_uuid).get("scenarios")[0][ + "parameters" + ], + ) + + # scenarios -> affected_pods + self.assertEqual( + len(elastic_telemetry.scenarios[0].affected_pods.recovered), 1 + ) + self.assertEqual( + len(elastic_telemetry.scenarios[0].affected_pods.unrecovered), 1 + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.error, "some error" + ) + + # scenarios -> affected_pods -> recovered + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .namespace, + "default", + ) + + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .total_recovery_time, + 10.0, + ) + + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .pod_readiness_time, + 5.0, + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.recovered[0] + .pod_rescheduling_time, + 2.0, + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + self.assertEqual( + elastic_telemetry.scenarios[0].affected_pods.recovered[0].pod_name, + "pod1", + ) + + # scenarios -> affected_pods -> unrecovered + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.unrecovered[0] + .pod_name, + "pod2", + ) + self.assertEqual( + elastic_telemetry.scenarios[0] + .affected_pods.unrecovered[0] + .namespace, + "default", + ) + + # node_summary_infos + self.assertEqual(len(elastic_telemetry.node_summary_infos), 1) + + self.assertEqual(elastic_telemetry.node_summary_infos[0].count, 5) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].architecture, "aarch64" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].instance_type, "m2i.xlarge" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].kernel_version, + "5.4.0-66-generic", + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].kubelet_version, "v2.1.2" + ) + self.assertEqual( + elastic_telemetry.node_summary_infos[0].os_version, "Linux" + ) + + # node_taints + self.assertEqual(len(elastic_telemetry.node_taints), 1) + + self.assertEqual( + elastic_telemetry.node_taints[0].key, + "node.kubernetes.io/unreachable", + ) + self.assertEqual(elastic_telemetry.node_taints[0].value, "NoExecute") + self.assertEqual(elastic_telemetry.node_taints[0].effect, "NoExecute") + + # objects_count + self.assertEqual( + len(elastic_telemetry.kubernetes_objects_count.to_dict().keys()), 2 + ) + self.assertEqual( + elastic_telemetry.kubernetes_objects_count.to_dict().get("Pod"), 5 + ) + self.assertEqual( + elastic_telemetry.kubernetes_objects_count.to_dict().get( + "Service" + ), + 2, + ) + + # network_plugins + + self.assertEqual(len(elastic_telemetry.network_plugins), 1) + self.assertEqual(elastic_telemetry.network_plugins[0], "Calico") + + # obejct properties + self.assertEqual(elastic_telemetry.timestamp, "2023-05-22T14:55:02Z") + self.assertEqual(elastic_telemetry.total_node_count, 3) + self.assertEqual(elastic_telemetry.cloud_infrastructure, "AWS") + self.assertEqual(elastic_telemetry.cloud_type, "EC2") + self.assertEqual(elastic_telemetry.run_uuid, run_uuid) + + def test_ElasticChaosRunTelemetry(self): + run_uuid = str(uuid.uuid4()) + example_data = self.get_ChaosRunTelemetry_json(run_uuid) + telemetry = ChaosRunTelemetry(json_dict=example_data) + # building from object (to save in elastic) + elastic_telemetry_object = ElasticChaosRunTelemetry( + chaos_run_telemetry=telemetry + ) + # building from dictionary (to retrieve from elastic) + elastic_telemetry_dic = ElasticChaosRunTelemetry(None, **example_data) + + self.check_test_ElasticChaosRunTelemetry( + elastic_telemetry_object, run_uuid + ) + self.check_test_ElasticChaosRunTelemetry( + elastic_telemetry_dic, run_uuid + )