From 06df1358cdf783aa6001ffdea8889dea094230c7 Mon Sep 17 00:00:00 2001 From: Martin Bernstorff Date: Tue, 16 Apr 2024 13:07:09 +0200 Subject: [PATCH] update temporal.py --- .grit/.gritmodules/github.com/getgrit/stdlib | 1 + .../spec_processors/temporal.py | 76 ++++++++----------- 2 files changed, 32 insertions(+), 45 deletions(-) create mode 160000 .grit/.gritmodules/github.com/getgrit/stdlib diff --git a/.grit/.gritmodules/github.com/getgrit/stdlib b/.grit/.gritmodules/github.com/getgrit/stdlib new file mode 160000 index 00000000..4dfa8175 --- /dev/null +++ b/.grit/.gritmodules/github.com/getgrit/stdlib @@ -0,0 +1 @@ +Subproject commit 4dfa81755f729bc38015a02df11b17c910f345b4 diff --git a/src/timeseriesflattener/spec_processors/temporal.py b/src/timeseriesflattener/spec_processors/temporal.py index b1b3793d..b65f3db4 100644 --- a/src/timeseriesflattener/spec_processors/temporal.py +++ b/src/timeseriesflattener/spec_processors/temporal.py @@ -149,9 +149,7 @@ def _slice_and_aggregate_spec( TemporalSpec = Union[PredictorSpec, OutcomeSpec, BooleanOutcomeSpec] -def _get_min_max_date_from_predictiontime_frame( - frame: PredictionTimeFrame, -) -> tuple[dt.datetime, dt.datetime]: +def _get_pred_time_range(frame: PredictionTimeFrame) -> tuple[dt.datetime, dt.datetime]: if isinstance(frame.df, pl.LazyFrame): df = frame.df.collect() else: @@ -164,52 +162,42 @@ def _get_min_max_date_from_predictiontime_frame( return start_date, end_date -def _get_longest_lookperiod(lookperiods: list[LookPeriod]) -> dt.timedelta: - max_lookperiod = list() - - for lookperiod in lookperiods: - max_lookperiod.append((lookperiod.first - lookperiod.last).days) - - max_lookperiod.sort() - - return dt.timedelta(max_lookperiod[0]) - - -def _create_date_range( +def _slice_datetime_interval( start_date: dt.datetime, end_date: dt.datetime, timedelta_days: int ) -> list[dt.datetime]: n = int((end_date - start_date).days / timedelta_days) return [start_date + dt.timedelta(timedelta_days * i) for i in range(n + 2)] -def _create_stride_chunks( - predictiontime_frame: PredictionTimeFrame, spec: TemporalSpec, date_range: list, step: int +def _create_step_frames( + predictiontime_frame: PredictionTimeFrame, + spec: TemporalSpec, + date_range: list[dt.datetime], + step_i: int, ) -> tuple[PredictionTimeFrame, ValueFrame]: + step_first_pred_time, step_last_pred_time = (date_range[step_i], date_range[step_i + 1]) + step_predictiontime_df = predictiontime_frame.df.filter( - (pl.col(predictiontime_frame.timestamp_col_name) >= date_range[step]) - & (pl.col(predictiontime_frame.timestamp_col_name) < date_range[step + 1]) + (pl.col(predictiontime_frame.timestamp_col_name) >= step_first_pred_time) + & (pl.col(predictiontime_frame.timestamp_col_name) < step_last_pred_time) + ) + + lookdistance = max( + [lookperiod.first - lookperiod.last for lookperiod in spec.normalised_lookperiod] ) - lookperiod = _get_longest_lookperiod(spec.normalised_lookperiod) + is_lookbehind = lookdistance < dt.timedelta(days=0) + value_timestamps = pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() - if lookperiod < dt.timedelta(days=0): + if is_lookbehind: step_value_df = spec.value_frame.df.filter( - ( - pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() - >= date_range[step] + lookperiod - ) - & ( - pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() - < date_range[step + 1] - ) + (value_timestamps >= step_first_pred_time + lookdistance) + & (value_timestamps < step_last_pred_time) ) else: step_value_df = spec.value_frame.df.filter( - (pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() >= date_range[step]) - & ( - pl.col(spec.value_frame.value_timestamp_col_name).dt.datetime() - < date_range[step + 1] + lookperiod - ) + (value_timestamps >= step_first_pred_time) + & (value_timestamps < step_last_pred_time + lookdistance) ) vf = spec.value_frame @@ -228,19 +216,16 @@ def _create_stride_chunks( def process_temporal_spec( spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, timedelta_days: int = 365 ) -> ProcessedFrame: - start_date, end_date = _get_min_max_date_from_predictiontime_frame(predictiontime_frame) - - date_range = _create_date_range(start_date, end_date, timedelta_days) + first_pred_time, last_pred_time = _get_pred_time_range(predictiontime_frame) + step_date_ranges = _slice_datetime_interval(first_pred_time, last_pred_time, timedelta_days) result_frames = list() - for step in range(len(date_range) - 1): - step_predictiontime_frame, step_value_frame = _create_stride_chunks( - predictiontime_frame, spec, date_range, step + for step in range(len(step_date_ranges) - 1): + step_predictiontime_frame, step_value_frame = _create_step_frames( + predictiontime_frame, spec, step_date_ranges, step ) - aggregated_value_frames = list() - - aggregated_value_frames += ( + step_aggregated_value_frames = ( Iter(spec.normalised_lookperiod) .map( lambda lookperiod: _slice_and_aggregate_spec( @@ -261,16 +246,17 @@ def process_temporal_spec( ) ) .flatten() + .to_list() ) result_frames += [ horizontally_concatenate_dfs( - aggregated_value_frames, + step_aggregated_value_frames, pred_time_uuid_col_name=step_predictiontime_frame.pred_time_uuid_col_name, ) ] return ProcessedFrame( - df=pl.concat(result_frames), + df=pl.concat(result_frames, how="vertical"), pred_time_uuid_col_name=predictiontime_frame.pred_time_uuid_col_name, )