Skip to content

Commit

Permalink
fixup! In KafkaConsumer class, implement throttling of telemetry data.
Browse files Browse the repository at this point in the history
  • Loading branch information
tribeiro committed Feb 14, 2025
1 parent 735b8fe commit b9e92c2
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions python/lsst/ts/salobj/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(
)
self.auto_throttle_qsize_limit = 5
self.max_throttle = 50
self.max_throttle_change = 3

self.schema_resolution_errors: dict[str, int] = dict()

Expand Down Expand Up @@ -483,9 +484,11 @@ def _handle_throttle(self) -> None:
old_throttle = self.throttle_telemetry[kafka_name].get(
index, throttle
)
self.throttle_telemetry[kafka_name][index] = int(
(old_throttle + throttle) / 2.0
)
if throttle > old_throttle + self.max_throttle_change:
throttle = old_throttle + self.max_throttle_change
elif throttle < old_throttle - self.max_throttle_change:
throttle = old_throttle - self.max_throttle_change
self.throttle_telemetry[kafka_name][index] = throttle
self.telemetry_n_reads[kafka_name][index] = 1
self.throutput_measurement_n_reads = 0
self.throutput_measurement_start = now
Expand Down

0 comments on commit b9e92c2

Please sign in to comment.