From c883d431e132d808b82154237dbdc73c97661a59 Mon Sep 17 00:00:00 2001 From: Wes Date: Thu, 14 Nov 2024 11:49:05 -0500 Subject: [PATCH 1/2] updates --- graphreduce/graph_reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphreduce/graph_reduce.py b/graphreduce/graph_reduce.py index 093249d..0c4a5c9 100644 --- a/graphreduce/graph_reduce.py +++ b/graphreduce/graph_reduce.py @@ -313,7 +313,7 @@ def join_any ( if isinstance(to_node.df, pyspark.sql.dataframe.DataFrame) and isinstance(from_node.df, pyspark.sql.dataframe.DataFrame): joined = to_node.df.join( from_node.df, - on=to_node.df[f"{to_node.prefix}_{to_node_key}"] == from_node.df[f"{relation_node.prefix}_{from_node_key}"], + on=to_node.df[f"{to_node.prefix}_{to_node_key}"] == from_node.df[f"{from_node.prefix}_{from_node_key}"], how="left" ) self._mark_merged(to_node, from_node) From a23dce1baa8d0a2fcdabf4fb41f275260c7296dd Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 27 Nov 2024 07:52:07 -0600 Subject: [PATCH 2/2] updates --- graphreduce/graph_reduce.py | 4 +++- graphreduce/node.py | 27 ++++++++++++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/graphreduce/graph_reduce.py b/graphreduce/graph_reduce.py index 0c4a5c9..c19473a 100644 --- a/graphreduce/graph_reduce.py +++ b/graphreduce/graph_reduce.py @@ -385,6 +385,7 @@ def join ( if isinstance(relation_df, pyspark.sql.dataframe.DataFrame) and isinstance(parent_node.df, pyspark.sql.dataframe.DataFrame): original = f"{relation_node.prefix}_{relation_fk}" new = f"{original}_dupe" + relation_df = relation_df.withColumnRenamed(original, new) joined = parent_node.df.join( relation_df, @@ -397,7 +398,8 @@ def join ( elif isinstance(parent_node.df, pyspark.sql.dataframe.DataFrame) and isinstance(relation_node.df, pyspark.sql.dataframe.DataFrame): original = f"{relation_node.prefix}_{relation_fk}" new = f"{original}_dupe" - relation_df = relation_df.withColumnRenamed(original, new) + relation_node.df = relation_node.df.withColumnRenamed(original, new) + joined = parent_node.df.join( relation_node.df, on=parent_node.df[f"{parent_node.prefix}_{parent_pk}"] == relation_node.df[new], diff --git a/graphreduce/node.py b/graphreduce/node.py index 820d9a4..4ff633f 100644 --- a/graphreduce/node.py +++ b/graphreduce/node.py @@ -397,6 +397,8 @@ def pandas_auto_features ( """ agg_funcs = {} + self._stypes = infer_df_stype(self.get_sample().head()) + ts_data = self.is_ts_data(reduce_key) if ts_data: # Make sure the dates are cleaned. @@ -483,6 +485,7 @@ def dask_auto_features ( upward through the graph from child nodes with no feature definitions. """ + self._stypes = infer_df_stype(self.get_sample().head()) agg_funcs = {} for col, stype in self._stypes.items(): _type = str(stype) @@ -507,17 +510,22 @@ def spark_auto_features ( upward through the graph from child nodes with no feature definitions. """ + + self._stypes = infer_df_stype(self.df.sample(0.5).limit(10).toPandas()) agg_funcs = [] ts_data = self.is_ts_data(reduce_key) if ts_data: logger.info(f"{self} is time-series data") for col, stype in self._stypes.items(): _type = str(stype) - #for field in self.df.schema.fields: - # field_meta = json.loads(field.json()) - # col = field_meta['name'] - # _type = field_meta['type'] - if type_func_map.get(_type): + + if self._is_identifier(col) and col != reduce_key: + func = 'count' + col_new = f"{col}_{func}" + agg_funcs.append(F.count(F.col(col)).alias(col_new)) + elif self._is_identifier(col) and col == reduce_key: + continue + elif type_func_map.get(_type): for func in type_func_map[_type]: if func == 'nunique': func = 'count_distinct' @@ -529,6 +537,11 @@ def spark_auto_features ( # If we have time-series data take the time # since the last event and the cut date. if ts_data: + # convert the date key to a timestamp + date_key_field = [x for x in self.df.schema.fields if x.name == self.colabbr(self.date_key)][0] + if date_key_field.dataType not in [T.TimestampType(), T.DateType()]: + logger.info(f'{self} date key was {date_key_field.dataType} - converting to Timestamp') + self.df = self.df.withColumn(self.colabbr(self.date_key), F.to_timestamp(F.col(self.colabbr(self.date_key)))) logger.info(f'computed post-aggregation features for {self}') spark_datetime = self.spark_sqlctx.sql(f"SELECT TO_DATE('{self.cut_date.strftime('%Y-%m-%d')}') as cut_date") if 'cut_date' not in grouped.columns: @@ -549,12 +562,12 @@ def spark_auto_features ( feat_prepped = self.prep_for_features() feat_prepped = feat_prepped.withColumn( self.colabbr('time_since_cut'), - F.unix_timestamp(F.col('cut_date')) - F.unix_timestamp(self.colabbr(self.date_key)) + F.unix_timestamp(F.col('cut_date')) - F.unix_timestamp(F.col(self.colabbr(self.date_key))) ).drop(F.col('cut_date')) sub = feat_prepped.filter( (feat_prepped[self.colabbr('time_since_cut')] >= 0) & - (feat_prepped[self.colabbr('time_since_cut')] <= d) + (feat_prepped[self.colabbr('time_since_cut')] <= (d*86400)) ) days_group = sub.groupBy(self.colabbr(reduce_key)).agg( F.count(self.colabbr(self.pk)).alias(self.colabbr(f'{d}d_num_events'))