diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..588ac34 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12 +ARG UID=10000 +ARG GID=10000 + +WORKDIR /src +COPY python/requirements.txt . +RUN pip install -r requirements.txt +COPY python/*.py ./ + +USER $UID:$GID +CMD python3 KAPEL.py diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..831ca4d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,6 @@ +services: + kapel-processor: + image: hub.opensciencegrid.org/osgpreview/kapel-processor:latest + build: + context: . + network: host diff --git a/python/KAPEL.py b/python/KAPEL.py index 6539f73..e64b01b 100644 --- a/python/KAPEL.py +++ b/python/KAPEL.py @@ -56,6 +56,11 @@ def __init__(self, queryRange, namespace): self.endtime = f'max_over_time(kube_pod_completion_time{{namespace="{namespace}"}}[{queryRange}])' self.starttime = f'max_over_time(kube_pod_start_time{{namespace="{namespace}"}}[{queryRange}])' self.cores = f'max_over_time(kube_pod_container_resource_requests{{resource="cpu", node != "", namespace="{namespace}"}}[{queryRange}])' + self.memory = f'sum by (pod) (max_over_time(kube_pod_container_resource_requests{{resource="memory", node!="", namespace="{namespace}"}}[{queryRange}])) / 1000' + + # This is container-level CPU usage reported by kubelets, for gratia output. + # Take the largest (i.e. final) value of the cumulative CPU usage of each container, and sum the results for all containers in a pod. + self.cpuusage = f'sum by (pod) (last_over_time(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[{queryRange}]))' def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end, last_end): output = ( @@ -81,6 +86,29 @@ def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end, ) return output +def individual_message(config, pod_name, memory, cores, wall_time, cpu_time, start_time, end_time): + """ Write an APEL individual job message based on prometheus metrics from a single pod """ + output = ( + f'APEL-individual-job-message: v0.3\n' + f'Site: {config.site_name}\n' + f'VO: {config.vo_name}\n' + f'SubmitHost: {config.submit_host}\n' + f'MachineName: {pod_name}\n' + f'LocalJobId: {pod_name}\n' + f'InfrastructureType: {config.infrastructure_type}\n' + f'ServiceLevelType: si2k\n' + f'ServiceLevel: {config.benchmark_value * 250}\n' + f'WallDuration: {wall_time}\n' + f'CpuDuration: {cpu_time}\n' + f'MemoryVirtual: {memory}\n' + f'Processors: {cores}\n' + f'NodeCount: {config.nodecount}\n' + f'StartTime: {start_time}\n' + f'EndTime: {end_time}\n' + f'%%\n' + ) + return output + def sync_message(config, year, month, n_jobs): output = ( f'APEL-sync-message: v0.1\n' @@ -159,43 +187,14 @@ def rearrange(x): # this produces each of the (key, value) tuples in the list yield item['metric']['pod'], float(item['value'][1]) -# process a time period (do prom query, process data, write output) -# takes a KAPELConfig object and one element of output from get_time_periods -# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there. -def process_period(config, period): - period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec']) - print( - f"Processing year {period['year']}, month {period['month']}, " - f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}." - ) - queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace) - - # SSL generally not used for Prometheus access within a cluster - # Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries - prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True) - prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout} - - raw_results, results, result_lengths = {}, {}, [] - # iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc. - for query_name, query_string in vars(queries).items(): - # Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains: - # 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name. - # 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in. - print(f'Executing {query_name} query: {query_string}') - t1 = timer() - raw_results[query_name] = prom.custom_query(query=query_string, params=prom_connect_params) - t2 = timer() - results[query_name] = dict(rearrange(raw_results[query_name])) - result_lengths.append(len(results[query_name])) - t3 = timer() - print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_results[query_name])} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.') - del raw_results[query_name] +def record_summarized_period(config, period_start, year, month, results): + """ Record the sum of usage across all pods in the given time period. """ cputime = results['cputime'] endtime = results['endtime'] starttime = results['starttime'] cores = results['cores'] - + result_lengths = list((len(l) for l in results.values())) # Confirm the assumption that cputime should have the fewest entries, while starttime and cores may have additional ones # corresponding to jobs that have started but not finished yet, and endtime may have additional ones if there are pods without CPU resource requests. # We only want the jobs for which all values are available: start time, end time, CPU request. @@ -241,8 +240,8 @@ def process_period(config, period): summary_output = summary_message( config, - year=period['year'], - month=period['month'], + year=year, + month=month, wall_time=sum_walltime, cpu_time=sum_cputime, n_jobs=len(endtime), @@ -250,7 +249,7 @@ def process_period(config, period): first_end=round(min(endtime.values())), last_end=round(max(endtime.values())) ) - sync_output = sync_message(config, year=period['year'], month=period['month'], n_jobs=len(endtime)) + sync_output = sync_message(config, year=year, month=month, n_jobs=len(endtime)) # Write output to the message queue on local filesystem # https://dirq.readthedocs.io/en/latest/queuesimple.html#directory-structure @@ -262,6 +261,73 @@ def process_period(config, period): print(f'Writing sync record to {config.output_path}/{sync_file}:') print('--------------------------------\n' + sync_output + '--------------------------------') +def record_individual_period(config, results): + """ Record each pod in the namespace over the summarized period. + Assumes each pod ran once and terminated upon completion. + """ + # Pivot records from {'data_type':{'pod_name':value}} to {'pod_name':{'data_type':value}} + per_pod_records = {} + for data_type, records in results.items(): + for pod, val in records.items(): + if not pod in per_pod_records: + per_pod_records[pod] = {} + per_pod_records[pod][data_type] = val + + dirq = QueueSimple(str(config.output_path)) + for pod_name, records in per_pod_records.items(): + # Only report on pods that have completed. Running pods won't have an endtime + if not ('starttime' in records and 'endtime' in records): + continue + individual_output = individual_message( + config, + pod_name, + records.get('memory', 0), + records.get('cores', 0), + records['endtime'] - records['starttime'], + records.get('cpuusage', 0), + records['starttime'], + records['endtime']) + record_file = dirq.add(individual_output) + print(f'Writing individual record to {config.output_path}/{record_file}:') + print('--------------------------------\n' + individual_output + '--------------------------------') + + +# process a time period (do prom query, process data, write output) +# takes a KAPELConfig object and one element of output from get_time_periods +# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there. +def process_period(config, period): + period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec']) + print( + f"Processing year {period['year']}, month {period['month']}, " + f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}." + ) + queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace) + + # SSL generally not used for Prometheus access within a cluster + # Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries + headers = {"Authorization": config.auth_header } if config.auth_header else None + prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True, headers=headers) + prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout} + + results = {} + # iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc. + for query_name, query_string in vars(queries).items(): + # Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains: + # 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name. + # 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in. + print(f'Executing {query_name} query: {query_string}') + t1 = timer() + raw_result = prom.custom_query(query=query_string, params=prom_connect_params) + t2 = timer() + results[query_name] = dict(rearrange(raw_result)) + t3 = timer() + print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_result)} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.') + + if config.summarize_records: + record_summarized_period(config, period_start, period['year'], period['month'], results) + else: + record_individual_period(config, results) + def main(envFile): print(f'Starting KAPEL processor: {__file__} with envFile {envFile} at {datetime.datetime.now(tz=datetime.timezone.utc).isoformat()}') cfg = KAPELConfig(envFile) diff --git a/python/KAPELConfig.py b/python/KAPELConfig.py index 85f417c..d628184 100644 --- a/python/KAPELConfig.py +++ b/python/KAPELConfig.py @@ -14,12 +14,17 @@ def __init__(self, envFile=None): # Format: validity is determined by python urllib.parse. self.prometheus_server = env.url("PROMETHEUS_SERVER", "http://kube-prometheus-prometheus.kube-prometheus:9090").geturl() + # Optionally add authentication headers - these are passed in under "Authorization" + self.auth_header = env.str("PROMETHEUS_AUTH_HEADER", None) + # The default behaviour ("auto" mode) is to publish records for the previous month, and up to the current day of the current month. self.publishing_mode = env.str("PUBLISHING_MODE", "auto") # The Kubernetes namespace to query. Only pods in this namespace will be accounted. self.namespace = env.str("NAMESPACE") + self.summarize_records = env.bool("SUMMARIZE_RECORDS", True) + # If PUBLISHING_MODE is "gap" instead, then a fixed time period will be queried instead and we need the start and end to be specified. # Format: ISO 8601, like "2020-12-20T07:20:50.52Z", to avoid complications with time zones and leap seconds. # Timezone should be specified, and it should be UTC for consistency with the auto mode publishing.