Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetricsReader can use any of the available aggregation functions, not just "avg" #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions atlas/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MetricValues(BaseModel):
metric: DeviceMetric
device_name: str
device_alias: str
aggregation: str
values: List[MetricValue]

class MetricsReader:
Expand All @@ -39,7 +40,7 @@ def __init__(self, refresh_token: Optional[str] = None, debug: Optional[bool] =
"""
self.client = AtlasClient(refresh_token=refresh_token, debug=debug)

def read(self, filter: Filter, start: Optional[datetime] = None, end: Optional[datetime] = None, interval: int = 60) -> Dict[str, List[MetricValues]]:
def read(self, filter: Filter, start: Optional[datetime] = None, end: Optional[datetime] = None, interval: int = 60, aggregate_by: List[str] = ["avg"]) -> Dict[str, List[MetricValues]]:
"""
Retrieve metric values for a given filter and time range.
Values are averaged over the sampling interval.
Expand All @@ -54,6 +55,9 @@ def read(self, filter: Filter, start: Optional[datetime] = None, end: Optional[d
End time of the historical values, by default now.
interval : int, optional
Sampling interval in seconds, by default 60.
aggregate_by: List of strings, optional.
Aggregation function to use over the interval, defaults to "avg".
Available agg functions are listed in the /models.AggregateBy class

Returns
-------
Expand Down Expand Up @@ -91,7 +95,7 @@ def read(self, filter: Filter, start: Optional[datetime] = None, end: Optional[d
aliases = [af["alias"] for af in alias_filters]

point_map = self._get_point_ids(facility, agent_id, aliases)
hvalues = self._get_historical_values(facility, agent_id, point_map, start, end, interval)
hvalues = self._get_historical_values(facility, agent_id, point_map, start, end, interval, aggregate_by)

self._process_historical_values(result, facility, device, alias_filters, point_map, hvalues)

Expand Down Expand Up @@ -139,9 +143,9 @@ def _get_point_ids(self, facility, agent_id: str, aliases: List[str]) -> Dict[st

return point_map

def _get_historical_values(self, facility, agent_id: str, point_map: Dict[str, str], start: Optional[datetime], end: Optional[datetime], interval: int):
def _get_historical_values(self, facility, agent_id: str, point_map: Dict[str, str], start: Optional[datetime], end: Optional[datetime], interval: int, aggregate_by: List[str]):
try:
return self.client.get_historical_values(facility.organization_id, agent_id, list(point_map.values()), start, end, interval)
return self.client.get_historical_values(facility.organization_id, agent_id, list(point_map.values()), start, end, interval, aggregate_by)
except Exception as e:
raise Exception(f"Error retrieving historical values for facility {facility.display_name}: {e}")

Expand All @@ -150,19 +154,23 @@ def _process_historical_values(self, result: defaultdict, facility, device, alia
point_id = agvalues.point_id
point_alias = next((alias for alias, pid in point_map.items() if pid == point_id), None)
point_filter = next((af["filter"] for af in alias_filters if af["alias"] == point_alias), None)
point_values = agvalues.values["avg"]

if point_values.analog:
vals, timestamps = point_values.analog.values, point_values.analog.timestamps
elif point_values.discrete:
vals, timestamps = point_values.discrete.values, point_values.discrete.timestamps
else:
vals, timestamps = [], []

metrics_values = MetricValues(
metric=DeviceMetric(name=point_filter, device_kind=device.kind),
device_name=device.name,
device_alias=device.alias,
values=[MetricValue(timestamp=datetime.fromtimestamp(ts), value=val) for ts, val in zip(timestamps, vals)]
)

for agg in agvalues.values.keys():
# for agg in aggregations:
point_values = agvalues.values[agg]

if point_values.analog:
vals, timestamps = point_values.analog.values, point_values.analog.timestamps
elif point_values.discrete:
vals, timestamps = point_values.discrete.values, point_values.discrete.timestamps
else:
vals, timestamps = [], []

metrics_values = MetricValues(
metric=DeviceMetric(name=point_filter, device_kind=device.kind),
device_name=device.name,
device_alias=device.alias,
aggregation=agg,
values=[MetricValue(timestamp=datetime.fromtimestamp(ts), value=val) for ts, val in zip(timestamps, vals)]
)
result[facility.short_name].append(metrics_values)