Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/processor output individual records for gratia, add CPU usage and memory metrics #54

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9e5dcef
first pass at adding optional gratia output container to helm chart
mwestphall Apr 23, 2024
ec1c443
fix variable name
mwestphall Apr 23, 2024
edddf5d
fix syntax for configmap in values.yaml
mwestphall Apr 23, 2024
95d0762
Add dedicated docker image for processor
mwestphall Apr 25, 2024
0e762a7
add optional mode to output individual job records rather than summaries
mwestphall Apr 25, 2024
5025f11
fix paramter order in individual_message
mwestphall Apr 26, 2024
9ef5ea1
add optional authentication secret to helm chart
mwestphall May 7, 2024
fd3e588
clean up some scratch work
mwestphall May 8, 2024
bdf032b
Add a README for helm chart configuration and installation
mwestphall May 10, 2024
f2418fb
minor documentation consistency/tweaks
rptaylor May 10, 2024
207e3ec
fix phrasing and missing link in README
mwestphall May 13, 2024
d55536d
Update helm chart make non-privileged user uid/gid configurable
mwestphall May 14, 2024
6950aed
make serviceUser.uid nullable
mwestphall May 14, 2024
747b484
Update processor to output gratia-compatible individual job records,
mwestphall May 14, 2024
c018792
Add explicit commands to cronjobs, fix default image tag
mwestphall May 20, 2024
3ae9213
Merge branch 'master' into feature/processor-output-individual-record…
mwestphall May 24, 2024
0f30ab8
clean up comments
mwestphall May 24, 2024
081e776
revert accidental change to readme
mwestphall May 28, 2024
af2af72
Deduplicate memory and cpu usage records, revert change to cpu usage …
mwestphall May 28, 2024
3873acc
use sum by pod instead of max by pod for more accurate v2 cgroups rep…
mwestphall Jun 10, 2024
40a8a8e
use last_over_time, update comments
rptaylor Jun 10, 2024
fc4da2a
report on pod total requested memory rather than max usage per container
mwestphall Jun 13, 2024
b5bc2c0
change result_lengths
rptaylor Jun 20, 2024
90fb6e5
update comment
rptaylor Jun 21, 2024
547df6c
small tidying
rptaylor Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.12
ARG UID=10000
ARG GID=10000

WORKDIR /src
COPY python/requirements.txt .
RUN pip install -r requirements.txt
COPY python/*.py ./

USER $UID:$GID
CMD python3 KAPEL.py
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
services:
kapel-processor:
image: hub.opensciencegrid.org/osgpreview/kapel-processor:latest
build:
context: .
network: host
136 changes: 101 additions & 35 deletions python/KAPEL.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def __init__(self, queryRange, namespace):
self.endtime = f'max_over_time(kube_pod_completion_time{{namespace="{namespace}"}}[{queryRange}])'
self.starttime = f'max_over_time(kube_pod_start_time{{namespace="{namespace}"}}[{queryRange}])'
self.cores = f'max_over_time(kube_pod_container_resource_requests{{resource="cpu", node != "", namespace="{namespace}"}}[{queryRange}])'
self.memory = f'sum by (pod) (max_over_time(kube_pod_container_resource_requests{{resource="memory", node!="", namespace="{namespace}"}}[{queryRange}])) / 1000'

# This is container-level CPU usage reported by kubelets, for gratia output.
# Take the largest (i.e. final) value of the cumulative CPU usage of each container, and sum the results for all containers in a pod.
self.cpuusage = f'sum by (pod) (last_over_time(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[{queryRange}]))'

def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end, last_end):
output = (
Expand All @@ -81,6 +86,29 @@ def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end,
)
return output

def individual_message(config, pod_name, memory, cores, wall_time, cpu_time, start_time, end_time):
""" Write an APEL individual job message based on prometheus metrics from a single pod """
output = (
f'APEL-individual-job-message: v0.3\n'
f'Site: {config.site_name}\n'
f'VO: {config.vo_name}\n'
f'SubmitHost: {config.submit_host}\n'
f'MachineName: {pod_name}\n'
f'LocalJobId: {pod_name}\n'
f'InfrastructureType: {config.infrastructure_type}\n'
f'ServiceLevelType: si2k\n'
f'ServiceLevel: {config.benchmark_value * 250}\n'
rptaylor marked this conversation as resolved.
Show resolved Hide resolved
f'WallDuration: {wall_time}\n'
f'CpuDuration: {cpu_time}\n'
f'MemoryVirtual: {memory}\n'
f'Processors: {cores}\n'
f'NodeCount: {config.nodecount}\n'
f'StartTime: {start_time}\n'
f'EndTime: {end_time}\n'
f'%%\n'
)
return output

def sync_message(config, year, month, n_jobs):
output = (
f'APEL-sync-message: v0.1\n'
Expand Down Expand Up @@ -159,43 +187,14 @@ def rearrange(x):
# this produces each of the (key, value) tuples in the list
yield item['metric']['pod'], float(item['value'][1])

# process a time period (do prom query, process data, write output)
# takes a KAPELConfig object and one element of output from get_time_periods
# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there.
def process_period(config, period):
period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec'])
print(
f"Processing year {period['year']}, month {period['month']}, "
f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}."
)
queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace)

# SSL generally not used for Prometheus access within a cluster
# Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True)
prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout}

raw_results, results, result_lengths = {}, {}, []
# iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc.
for query_name, query_string in vars(queries).items():
# Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains:
# 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name.
# 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in.
print(f'Executing {query_name} query: {query_string}')
t1 = timer()
raw_results[query_name] = prom.custom_query(query=query_string, params=prom_connect_params)
t2 = timer()
results[query_name] = dict(rearrange(raw_results[query_name]))
result_lengths.append(len(results[query_name]))
t3 = timer()
print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_results[query_name])} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.')
del raw_results[query_name]

def record_summarized_period(config, period_start, year, month, results):
""" Record the sum of usage across all pods in the given time period. """
cputime = results['cputime']
endtime = results['endtime']
starttime = results['starttime']
cores = results['cores']

result_lengths = list((len(l) for l in results.values()))
# Confirm the assumption that cputime should have the fewest entries, while starttime and cores may have additional ones
# corresponding to jobs that have started but not finished yet, and endtime may have additional ones if there are pods without CPU resource requests.
# We only want the jobs for which all values are available: start time, end time, CPU request.
Expand Down Expand Up @@ -241,16 +240,16 @@ def process_period(config, period):

summary_output = summary_message(
config,
year=period['year'],
month=period['month'],
year=year,
month=month,
wall_time=sum_walltime,
cpu_time=sum_cputime,
n_jobs=len(endtime),
# this appears faster than getting min/max during the dict iteration above
first_end=round(min(endtime.values())),
last_end=round(max(endtime.values()))
)
sync_output = sync_message(config, year=period['year'], month=period['month'], n_jobs=len(endtime))
sync_output = sync_message(config, year=year, month=month, n_jobs=len(endtime))

# Write output to the message queue on local filesystem
# https://dirq.readthedocs.io/en/latest/queuesimple.html#directory-structure
Expand All @@ -262,6 +261,73 @@ def process_period(config, period):
print(f'Writing sync record to {config.output_path}/{sync_file}:')
print('--------------------------------\n' + sync_output + '--------------------------------')

def record_individual_period(config, results):
""" Record each pod in the namespace over the summarized period.
Assumes each pod ran once and terminated upon completion.
"""
# Pivot records from {'data_type':{'pod_name':value}} to {'pod_name':{'data_type':value}}
per_pod_records = {}
for data_type, records in results.items():
for pod, val in records.items():
if not pod in per_pod_records:
per_pod_records[pod] = {}
per_pod_records[pod][data_type] = val

dirq = QueueSimple(str(config.output_path))
for pod_name, records in per_pod_records.items():
# Only report on pods that have completed. Running pods won't have an endtime
if not ('starttime' in records and 'endtime' in records):
continue
individual_output = individual_message(
config,
pod_name,
records.get('memory', 0),
records.get('cores', 0),
records['endtime'] - records['starttime'],
records.get('cpuusage', 0),
records['starttime'],
records['endtime'])
record_file = dirq.add(individual_output)
print(f'Writing individual record to {config.output_path}/{record_file}:')
print('--------------------------------\n' + individual_output + '--------------------------------')


# process a time period (do prom query, process data, write output)
# takes a KAPELConfig object and one element of output from get_time_periods
# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there.
def process_period(config, period):
period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec'])
print(
f"Processing year {period['year']}, month {period['month']}, "
f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}."
)
queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace)

# SSL generally not used for Prometheus access within a cluster
# Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
headers = {"Authorization": config.auth_header } if config.auth_header else None
prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True, headers=headers)
rptaylor marked this conversation as resolved.
Show resolved Hide resolved
prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout}

results = {}
# iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc.
for query_name, query_string in vars(queries).items():
# Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains:
# 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name.
# 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in.
print(f'Executing {query_name} query: {query_string}')
t1 = timer()
raw_result = prom.custom_query(query=query_string, params=prom_connect_params)
t2 = timer()
results[query_name] = dict(rearrange(raw_result))
t3 = timer()
print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_result)} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.')

if config.summarize_records:
record_summarized_period(config, period_start, period['year'], period['month'], results)
else:
record_individual_period(config, results)

def main(envFile):
print(f'Starting KAPEL processor: {__file__} with envFile {envFile} at {datetime.datetime.now(tz=datetime.timezone.utc).isoformat()}')
cfg = KAPELConfig(envFile)
Expand Down
5 changes: 5 additions & 0 deletions python/KAPELConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ def __init__(self, envFile=None):
# Format: validity is determined by python urllib.parse.
self.prometheus_server = env.url("PROMETHEUS_SERVER", "http://kube-prometheus-prometheus.kube-prometheus:9090").geturl()

# Optionally add authentication headers - these are passed in under "Authorization"
self.auth_header = env.str("PROMETHEUS_AUTH_HEADER", None)

# The default behaviour ("auto" mode) is to publish records for the previous month, and up to the current day of the current month.
self.publishing_mode = env.str("PUBLISHING_MODE", "auto")

# The Kubernetes namespace to query. Only pods in this namespace will be accounted.
self.namespace = env.str("NAMESPACE")

self.summarize_records = env.bool("SUMMARIZE_RECORDS", True)

# If PUBLISHING_MODE is "gap" instead, then a fixed time period will be queried instead and we need the start and end to be specified.
# Format: ISO 8601, like "2020-12-20T07:20:50.52Z", to avoid complications with time zones and leap seconds.
# Timezone should be specified, and it should be UTC for consistency with the auto mode publishing.
Expand Down