Skip to content

Commit

Permalink
update temporal.py
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBernstorff committed Apr 16, 2024
1 parent 4b89ece commit 06df135
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 45 deletions.
1 change: 1 addition & 0 deletions .grit/.gritmodules/github.com/getgrit/stdlib
Submodule stdlib added at 4dfa81
76 changes: 31 additions & 45 deletions src/timeseriesflattener/spec_processors/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def _slice_and_aggregate_spec(
TemporalSpec = Union[PredictorSpec, OutcomeSpec, BooleanOutcomeSpec]


def _get_min_max_date_from_predictiontime_frame(
frame: PredictionTimeFrame,
) -> tuple[dt.datetime, dt.datetime]:
def _get_pred_time_range(frame: PredictionTimeFrame) -> tuple[dt.datetime, dt.datetime]:
if isinstance(frame.df, pl.LazyFrame):
df = frame.df.collect()
else:
Expand All @@ -164,52 +162,42 @@ def _get_min_max_date_from_predictiontime_frame(
return start_date, end_date


def _get_longest_lookperiod(lookperiods: list[LookPeriod]) -> dt.timedelta:
max_lookperiod = list()

for lookperiod in lookperiods:
max_lookperiod.append((lookperiod.first - lookperiod.last).days)

max_lookperiod.sort()

return dt.timedelta(max_lookperiod[0])


def _create_date_range(
def _slice_datetime_interval(
start_date: dt.datetime, end_date: dt.datetime, timedelta_days: int
) -> list[dt.datetime]:
n = int((end_date - start_date).days / timedelta_days)
return [start_date + dt.timedelta(timedelta_days * i) for i in range(n + 2)]


def _create_stride_chunks(
predictiontime_frame: PredictionTimeFrame, spec: TemporalSpec, date_range: list, step: int
def _create_step_frames(
predictiontime_frame: PredictionTimeFrame,
spec: TemporalSpec,
date_range: list[dt.datetime],
step_i: int,
) -> tuple[PredictionTimeFrame, ValueFrame]:
step_first_pred_time, step_last_pred_time = (date_range[step_i], date_range[step_i + 1])

step_predictiontime_df = predictiontime_frame.df.filter(
(pl.col(predictiontime_frame.timestamp_col_name) >= date_range[step])
& (pl.col(predictiontime_frame.timestamp_col_name) < date_range[step + 1])
(pl.col(predictiontime_frame.timestamp_col_name) >= step_first_pred_time)
& (pl.col(predictiontime_frame.timestamp_col_name) < step_last_pred_time)
)

lookdistance = max(
[lookperiod.first - lookperiod.last for lookperiod in spec.normalised_lookperiod]
)

lookperiod = _get_longest_lookperiod(spec.normalised_lookperiod)
is_lookbehind = lookdistance < dt.timedelta(days=0)
value_timestamps = pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime()

if lookperiod < dt.timedelta(days=0):
if is_lookbehind:
step_value_df = spec.value_frame.df.filter(
(
pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime()
>= date_range[step] + lookperiod
)
& (
pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime()
< date_range[step + 1]
)
(value_timestamps >= step_first_pred_time + lookdistance)
& (value_timestamps < step_last_pred_time)
)
else:
step_value_df = spec.value_frame.df.filter(
(pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() >= date_range[step])
& (
pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime()
< date_range[step + 1] + lookperiod
)
(value_timestamps >= step_first_pred_time)
& (value_timestamps < step_last_pred_time + lookdistance)
)

vf = spec.value_frame
Expand All @@ -228,19 +216,16 @@ def _create_stride_chunks(
def process_temporal_spec(
spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, timedelta_days: int = 365
) -> ProcessedFrame:
start_date, end_date = _get_min_max_date_from_predictiontime_frame(predictiontime_frame)

date_range = _create_date_range(start_date, end_date, timedelta_days)
first_pred_time, last_pred_time = _get_pred_time_range(predictiontime_frame)
step_date_ranges = _slice_datetime_interval(first_pred_time, last_pred_time, timedelta_days)

result_frames = list()
for step in range(len(date_range) - 1):
step_predictiontime_frame, step_value_frame = _create_stride_chunks(
predictiontime_frame, spec, date_range, step
for step in range(len(step_date_ranges) - 1):
step_predictiontime_frame, step_value_frame = _create_step_frames(
predictiontime_frame, spec, step_date_ranges, step
)

aggregated_value_frames = list()

aggregated_value_frames += (
step_aggregated_value_frames = (
Iter(spec.normalised_lookperiod)
.map(
lambda lookperiod: _slice_and_aggregate_spec(
Expand All @@ -261,16 +246,17 @@ def process_temporal_spec(
)
)
.flatten()
.to_list()
)

result_frames += [
horizontally_concatenate_dfs(
aggregated_value_frames,
step_aggregated_value_frames,
pred_time_uuid_col_name=step_predictiontime_frame.pred_time_uuid_col_name,
)
]

return ProcessedFrame(
df=pl.concat(result_frames),
df=pl.concat(result_frames, how="vertical"),
pred_time_uuid_col_name=predictiontime_frame.pred_time_uuid_col_name,
)

0 comments on commit 06df135

Please sign in to comment.