Skip to content

Commit

Permalink
prepare re-check discovery items still there
Browse files Browse the repository at this point in the history
  • Loading branch information
blodone committed Mar 12, 2024
1 parent 8bdaa1a commit 50cd37c
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 44 deletions.
3 changes: 3 additions & 0 deletions base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Configuration:
discovery_interval_slow: int = 60 * 60 * 2
resend_data_interval_slow: int = 60 * 30

discovery_interval_delay: int = 120
data_resend_interval_delay: int = 180

def _convert_to_type(self, field_name: str,
value: str | list[str] | bool | int | ClusterAccessConfigType) -> \
str | list[str] | bool | int | ClusterAccessConfigType:
Expand Down
88 changes: 46 additions & 42 deletions base/daemon_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ def __init__(self, config: Configuration,
self.logger.info(f"Initialized cluster access for {config.k8s_config_type}")
# K8S API
self.debug_k8s_events = False
self.core_v1 = KubernetesApi(self.api_client).core_v1
self.apps_v1 = KubernetesApi(self.api_client).apps_v1
self.extensions_v1 = KubernetesApi(self.api_client).extensions_v1
self.apis = {
'core_v1': KubernetesApi(self.api_client).core_v1,
'apps_v1': KubernetesApi(self.api_client).apps_v1,
'extensions_v1': KubernetesApi(self.api_client).extensions_v1
}

self.zabbix_sender = ZabbixSender(zabbix_server=config.zabbix_server)
self.zabbix_resources = CheckKubernetesDaemon.exclude_resources(resources,
self.config.zabbix_resources_exclude)
config.zabbix_resources_exclude)
self.zabbix_host = config.zabbix_host
self.zabbix_debug = config.zabbix_debug
self.zabbix_single_debug = config.zabbix_single_debug
Expand All @@ -108,14 +110,14 @@ def __init__(self, config: Configuration,
self.web_api = None
self.web_api_enable = config.web_api_enable
self.web_api_resources = CheckKubernetesDaemon.exclude_resources(resources,
self.config.web_api_resources_exclude)
config.web_api_resources_exclude)

self.web_api_host = config.web_api_host
self.web_api_token = config.web_api_token
self.web_api_cluster = config.web_api_cluster
self.web_api_verify_ssl = config.web_api_verify_ssl

self.resources = CheckKubernetesDaemon.exclude_resources(resources, self.config.resources_exclude)
self.resources = CheckKubernetesDaemon.exclude_resources(resources, config.resources_exclude)

self.logger.info(f"Init K8S-ZABBIX Watcher for resources: {','.join(self.resources)}")
self.logger.info(f"Zabbix Host: {self.zabbix_host} / Zabbix Proxy or Server: {config.zabbix_server}")
Expand Down Expand Up @@ -144,7 +146,7 @@ def handler(self, signum: int, *args: str) -> None:
for r, d in self.data.items():
for obj_name, obj_d in d.objects.items():
self.logger.info(
f"resource={r}, last_sent_zabbix={obj_d.last_sent_zabbix}, " + f"last_sent_web={obj_d.last_sent_web}"
f"resource={r}, [{obj_name}], last_sent_zabbix={obj_d.last_sent_zabbix}, " + f"last_sent_web={obj_d.last_sent_web}"
)
for resource_discovered, resource_discovered_time in self.discovery_sent.items():
self.logger.info(
Expand All @@ -171,9 +173,14 @@ def start_data_threads(self) -> None:
threading.excepthook = self.excepthook
for resource in self.resources:
with self.thread_lock:
self.data.setdefault(resource, K8sResourceManager(resource, zabbix_host=self.zabbix_host, config=self.config))
self.data.setdefault(resource, K8sResourceManager(resource,
apis=self.apis,
zabbix_host=self.zabbix_host,
config=self.config))
if resource == "pods":
self.data.setdefault("containers", K8sResourceManager("containers", config=self.config))
# additional containers coming from pods
self.data.setdefault("containers", K8sResourceManager("containers",
config=self.config))

if resource in ['containers', 'services']:
thread = TimedThread(resource, self.data_resend_interval, exit_flag,
Expand Down Expand Up @@ -210,9 +217,9 @@ def start_loop_send_discovery_threads(self) -> None:
continue

send_discovery_thread = TimedThread(resource, self.discovery_interval, exit_flag,
daemon_object=self, daemon_method='send_zabbix_discovery',
daemon_object=self, daemon_method='update_discovery',
delay_first_run=True,
delay_first_run_seconds=120)
delay_first_run_seconds=self.config.discovery_interval_delay)
self.manage_threads.append(send_discovery_thread)
send_discovery_thread.start()

Expand All @@ -221,31 +228,18 @@ def start_resend_threads(self) -> None:
resend_thread = TimedThread(resource, self.data_resend_interval, exit_flag,
daemon_object=self, daemon_method='resend_data',
delay_first_run=True,
delay_first_run_seconds=180,
delay_first_run_seconds=self.config.data_resend_interval_delay,
)
self.manage_threads.append(resend_thread)
resend_thread.start()

def get_api_for_resource(self, resource: str) -> CoreV1Api | AppsV1Api | ApiextensionsV1Api:
if resource in ['nodes', 'components', 'secrets', 'pods', 'services', 'pvcs']:
api = self.core_v1
elif resource in ["deployments", "daemonsets", "statefulsets"]:
api = self.apps_v1
elif resource in ["ingresses"]:
api = self.extensions_v1
elif resource == 'containers':
api = None
else:
raise AttributeError("No valid resource found: %s" % resource)
return api

def get_web_api(self) -> WebApi:
if not hasattr(self, '_web_api'):
self._web_api = WebApi(self.web_api_host, self.web_api_token, verify_ssl=self.web_api_verify_ssl)
return self._web_api

def watch_data(self, resource: str) -> None:
api = self.get_api_for_resource(resource)
api = self.data[resource].api
stream_named_arguments = {"timeout_seconds": self.config.k8s_api_stream_timeout_seconds}
request_named_arguments = {"_request_timeout": self.config.k8s_api_request_timeout_seconds}
self.logger.info(
Expand Down Expand Up @@ -457,29 +451,39 @@ def resend_data(self, resource: str) -> None:
def delete_object(self, resource_type: str, resourced_obj: K8sObject) -> None:
self.send_to_web_api(resource_type, resourced_obj, "deleted")

def update_discovery(self, resource: str) -> None:
""" Update elements on hold and send to zabbix """
resource_obj = self.data[resource].resource_meta
with self.thread_lock:
obj_list = resource_obj.get_uid_list()
for obj_uid in self.data[resource].objects:
if obj_uid not in obj_list:
self.logger.info("NOT finding {obj.uid} anymore -> removing")
self.send_zabbix_discovery(resource)

def send_zabbix_discovery(self, resource: str) -> None:
# aggregate data and send to zabbix
next_run = datetime.now() + timedelta(seconds=self.discovery_interval)
self.logger.info(f"send_zabbix_discovery: {resource}, next run: {next_run.isoformat()}")
with self.thread_lock:
if resource not in self.data:
self.logger.warning('send_zabbix_discovery: resource "%s" not in self.data... skipping!' % resource)
return

data = list()
for obj_uid, obj in self.data[resource].objects.items():
data += obj.get_zabbix_discovery_data()
if resource not in self.data:
self.logger.warning('send_zabbix_discovery: resource "%s" not in self.data... skipping!' % resource)
return

if data:
metric = obj.get_discovery_for_zabbix(data)
self.logger.debug('send_zabbix_discovery: resource "%s": %s' % (resource, metric))
self.send_discovery_to_zabbix(resource, metric=metric)
else:
self.logger.warning('send_zabbix_discovery: resource "%s" has no discovery data' % resource)
data = list()
for obj_uid, obj in self.data[resource].objects.items():
data += obj.get_zabbix_discovery_data()

if data:
metric = obj.get_discovery_for_zabbix(data)
self.logger.debug('send_zabbix_discovery: resource "%s": %s' % (resource, metric))
self.send_discovery_to_zabbix(resource, metric=metric)
else:
self.logger.warning('send_zabbix_discovery: resource "%s" has no discovery data' % resource)

self.discovery_sent[resource] = datetime.now()
if resource == 'pods' and self.config.container_crawling == 'container':
self.discovery_sent['containers'] = datetime.now()
self.discovery_sent[resource] = datetime.now()
if resource == 'pods' and self.config.container_crawling == 'container':
self.discovery_sent['containers'] = datetime.now()

def send_object(self, resource: str, resourced_obj: K8sObject,
event_type: str, send_zabbix_data: bool = False,
Expand Down
3 changes: 3 additions & 0 deletions config_default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ resend_data_interval_fast = 120

discovery_interval_slow = 7200
resend_data_interval_slow = 1800

discovery_interval_delay = 120
data_resend_interval_delay = 180
3 changes: 3 additions & 0 deletions k8sobjects/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
class Component(K8sObject):
object_type = 'component'

def get_list(self):
return self.manager.api.list_component_status()

@property
def resource_data(self):
data = super().resource_data
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/daemonset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
class Daemonset(K8sObject):
object_type = "daemonset"

def get_list(self):
return self.manager.api.list_daemon_set_for_all_namespaces()

@property
def resource_data(self):
data = super().resource_data
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
class Deployment(K8sObject):
object_type = "deployment"

def get_list(self):
return self.manager.api.list_deployment_for_all_namespaces()

@property
def resource_data(self):
data = super().resource_data
Expand Down
12 changes: 12 additions & 0 deletions k8sobjects/k8sobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ def name_space(self) -> str | None:
def slug(self, name):
return slugit(self.name_space or "None", name, 40)

def get_uid_list(self):
ret = []
if self.resource == 'pvcs':
obj_list = self.get_list()
else:
obj_list = self.get_list().items

for obj in obj_list:
n = self.manager.resource_class(obj.to_dict(), self.resource, manager=self.manager)
ret.append(n.uid)
return ret

def is_unsubmitted_web(self) -> bool:
return self.last_sent_web == INITIAL_DATE

Expand Down
26 changes: 24 additions & 2 deletions k8sobjects/k8sresourcemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@
import logging
from datetime import datetime

from kubernetes.client import (AppsV1Api, CoreV1Api,
ApiextensionsV1Api)

from base.config import Configuration
from k8sobjects.k8sobject import K8S_RESOURCES, K8sObject

logger = logging.getLogger("k8s-zabbix")


class K8sResourceManager:
def __init__(self, resource: str, zabbix_host: str | None = None, config: Configuration | None = None):
def __init__(self, resource: str, apis: dict | None = None,
zabbix_host: str | None = None, config: Configuration | None = None):
self.resource = resource
self.apis = apis
self.zabbix_host = zabbix_host
self.config = config

Expand All @@ -20,8 +25,26 @@ def __init__(self, resource: str, zabbix_host: str | None = None, config: Config
mod = importlib.import_module('k8sobjects')
class_label = K8S_RESOURCES[resource]
self.resource_class = getattr(mod, class_label.capitalize(), None)
if self.resource_class is not None:
self.resource_meta = self.resource_class(None, self.resource, manager=self)

logger.info(f"Creating new resource manager for resource {resource} with class {self.resource_class}")

self.api = self.get_api_for_resource(resource)

def get_api_for_resource(self, resource: str) -> CoreV1Api | AppsV1Api | ApiextensionsV1Api:
if resource in ['nodes', 'components', 'secrets', 'pods', 'services', 'pvcs']:
api = self.apis.get('core_v1')
elif resource in ["deployments", "daemonsets", "statefulsets"]:
api = self.apis.get('apps_v1')
elif resource in ["ingresses"]:
api = self.apis.get('extensions_v1')
elif resource == 'containers':
api = None
else:
raise AttributeError("No valid resource found: %s" % resource)
return api

def add_obj_from_data(self, data: dict) -> K8sObject | None:
if not self.resource_class:
logger.error('No Resource Class found for "%s"' % self.resource)
Expand All @@ -31,7 +54,6 @@ def add_obj_from_data(self, data: dict) -> K8sObject | None:
return self.add_obj(new_obj)

def add_obj(self, new_obj: K8sObject) -> K8sObject | None:

if new_obj.uid not in self.objects:
# new object
self.objects[new_obj.uid] = new_obj
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class Node(K8sObject):
"capacity.pods",
]

def get_list(self):
return self.manager.api.list_node()

@property
def resource_data(self):
data = super().resource_data
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class Pod(K8sObject):
object_type = 'pod'
kind = None

def get_list(self):
return self.manager.api.list_pod_for_all_namespaces()

@property
def name(self) -> str:
return self.real_name
Expand Down
6 changes: 6 additions & 0 deletions k8sobjects/pvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ def get_pvc_volumes_for_all_nodes(api: CoreV1Api, timeout: int, namespace_exclud
class Pvc(K8sObject):
object_type = "pvc"

def get_list(self):
return get_pvc_volumes_for_all_nodes(api=self.manager.api,
timeout=self.manager.config.k8s_api_request_timeout_seconds,
namespace_exclude_re=self.manager.config.namespace_exclude_re,
resource_manager=self.manager)

@property
def resource_data(self):
data = super().resource_data
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
class Service(K8sObject):
object_type = "service"

def get_list(self):
return self.manager.api.list_service_for_all_namespaces()

@property
def resource_data(self):
data = super().resource_data
Expand Down
3 changes: 3 additions & 0 deletions k8sobjects/statefulset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
class Statefulset(K8sObject):
object_type = "statefulset"

def get_list(self):
return self.manager.api.list_stateful_set_for_all_namespaces()

@property
def resource_data(self):
data = super().resource_data
Expand Down
35 changes: 35 additions & 0 deletions raw_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sys

from kubernetes import client
from kubernetes import config as kube_config
from base.config import ClusterAccessConfigType, Configuration
from base.daemon_thread import KubernetesApi


class RawClient:
def __init__(self, ini_file):
config = Configuration()
config.load_config_file(ini_file)
self.config = config

if config.k8s_config_type is ClusterAccessConfigType.INCLUSTER:
kube_config.load_incluster_config()
self.api_client = client.ApiClient()
elif config.k8s_config_type is ClusterAccessConfigType.KUBECONFIG:
kube_config.load_kube_config()
self.api_client = kube_config.new_client_from_config()
elif config.k8s_config_type is ClusterAccessConfigType.TOKEN:
self.api_configuration = client.Configuration()
self.api_configuration.host = config.k8s_api_host
self.api_configuration.verify_ssl = config.verify_ssl
self.api_configuration.api_key = {"authorization": "Bearer " + config.k8s_api_token}
self.api_client = client.ApiClient(self.api_configuration)
else:
self.logger.fatal(f"k8s_config_type = {config.k8s_config_type} is not implemented")
sys.exit(1)

self.apis = {
'core_v1': KubernetesApi(self.api_client).core_v1,
'apps_v1': KubernetesApi(self.api_client).apps_v1,
'extensions_v1': KubernetesApi(self.api_client).extensions_v1
}
1 change: 1 addition & 0 deletions tests/unit/resources/test.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
yolobanana = rofl
debug = True
discovery_interval_fast = 12
discovery_interval_delay = 50
zabbix_resources_exclude = jacco, wacco
1 change: 1 addition & 0 deletions tests/unit/test_config_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def test_load_config():
cfg.load_from_environment_variables()
assert (cfg.debug is True)
assert (cfg.discovery_interval_fast == 12)
assert (cfg.discovery_interval_delay == 60)
assert ("jacco" in cfg.zabbix_resources_exclude)
assert ("wacco" in cfg.zabbix_resources_exclude)
print("")
Expand Down

0 comments on commit 50cd37c

Please sign in to comment.