diff --git a/exploitation_zone/dim_cust_location.py b/exploitation_zone/dim_cust_location.py new file mode 100644 index 0000000..ed612f8 --- /dev/null +++ b/exploitation_zone/dim_cust_location.py @@ -0,0 +1,74 @@ +import logging +import os +import configparser +import json +from datetime import datetime +from pyspark.sql import SparkSession, functions as F +from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace + + +# Configure logging +logging.basicConfig(level=logging.INFO) # Set log level to INFO + +# Create logger object +logger = logging.getLogger() + +# Get base directory +root_dir = os.path.abspath(os.path.join(os.getcwd())) + +# Specify the path to config file +config_file_path = os.path.join(root_dir, "config.ini") +config = configparser.ConfigParser() +config.read(config_file_path) + +config_file_path_json = os.path.join(root_dir, "config.json") +with open(config_file_path_json) as f: + config_json = json.load(f) + + +if __name__ == "__main__": + gcs_config = config["GCS"]["credentials_path"] + raw_bucket_name = config["GCS"]["raw_bucket_name"] + formatted_bucket_name = config["GCS"]["formatted_bucket_name"] + exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"] + + spark = SparkSession.builder \ + .appName("Customer Dimension table creation") \ + .config("spark.driver.host", "127.0.0.1") \ + .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ + .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ + .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \ + .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \ + .getOrCreate() + + logger.info('-----------------------------------------------------') + logger.info("Creating customer dimension table") + + # Read the Parquet file into a DataFrame from GCS Bucket + customers_df = spark.read.parquet('gs://'+formatted_bucket_name+'/customers*.parquet') + location_df = spark.read.parquet('gs://'+raw_bucket_name+'/location*.parquet') + + cust_location_df = spark.read.parquet('gs://'+raw_bucket_name+'/customer_location*.parquet') + cust_location_df = cust_location_df.withColumnRenamed("location_id","customer_location").withColumnRenamed("customer_id","customer") + + df1 = customers_df.join(cust_location_df, customers_df.customer_id==cust_location_df.customer,"inner") + df2 = df1.join(location_df, df1.customer_location==location_df.location_id, "inner") + + dim_cust = df2.select("customer_id", "customer_name", "email_id", "location_id") + + dim_cust.dropDuplicates(dim_cust) + + dim_cust_location = ( + df2 + .select("location_id", "postal_code", "place_name", "latitude", "longitude") + .groupBy("location_id") + .agg( + F.min("postal_code").alias("postal_code"), + F.min("place_name").alias("place_name"), + F.min("latitude").alias("latitude"), + F.min("longitude").alias("longitude") + ) + ) + + dim_cust.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_customer_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet') + dim_cust_location.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_customer_location_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet') \ No newline at end of file diff --git a/exploitation_zone/dim_date.py b/exploitation_zone/dim_date.py new file mode 100644 index 0000000..a9bc45e --- /dev/null +++ b/exploitation_zone/dim_date.py @@ -0,0 +1,71 @@ +import os +import json +import configparser +import logging +from datetime import datetime, timedelta +from pyspark.sql import SparkSession, functions as F +from pyspark.sql.functions import min, max, explode, sequence, col, date_format, expr, to_date + +from pyspark.sql.types import IntegerType + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger() + +# Get base directory +root_dir = os.path.abspath(os.path.join(os.getcwd())) + +# Specify the path to config files +config_file_path = os.path.join(root_dir, "config.ini") +config_file_path_json = os.path.join(root_dir, "config.json") + +# Load configuration files +config = configparser.ConfigParser() +config.read(config_file_path) + +with open(config_file_path_json) as f: + config_json = json.load(f) + +def generate_dates(start_date_str, end_date_str): + start_date = datetime.strptime(start_date_str, '%Y-%m-%d') + end_date = datetime.strptime(end_date_str, '%Y-%m-%d') + date_list = [] + while start_date <= end_date: + date_list.append(start_date.strftime('%Y-%m-%d')) + start_date += timedelta(days=1) + return date_list + + +if __name__ == "__main__": + gcs_config = config["GCS"]["credentials_path"] + raw_bucket_name = config["GCS"]["raw_bucket_name"] + formatted_bucket_name = config["GCS"]["formatted_bucket_name"] + exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"] + + spark = SparkSession.builder \ + .appName("Date Dimension table creation") \ + .config("spark.driver.host", "127.0.0.1") \ + .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ + .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ + .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \ + .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \ + .getOrCreate() + + logger.info('-----------------------------------------------------') + logger.info("Creating date dimension table") + + customers_df = spark.read.parquet(f'gs://{raw_bucket_name}/customer_purchase_*') + min_max_dates = customers_df.agg( + F.min("purchase_date").alias("min_purchase_date"), + F.max("purchase_date").alias("max_purchase_date") + ) + + date_df = min_max_dates.select(explode(sequence(to_date(col("min_purchase_date")), to_date(col("max_purchase_date")), expr("INTERVAL 1 DAY")))).alias("date") + date_df = date_df.withColumn("date_id", F.date_format(F.col("col"), "yyyyMMdd").cast(IntegerType())) + date_df = date_df.withColumn("day", F.date_format(F.col("col"), "d").cast(IntegerType())) + date_df = date_df.withColumn("month", F.date_format(F.col("col"), "M").cast(IntegerType())) + date_df = date_df.withColumn("quarter", ((F.date_format(F.col("col"), "M").cast(IntegerType()) - 1) / 3 + 1).cast(IntegerType())) + date_df = date_df.withColumn("year", F.date_format(F.col("col"), "yyyy").cast(IntegerType())) + date_df = date_df.withColumnRenamed("col","date").select("date_id","date","year","month","day") + + date_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_date_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet') \ No newline at end of file diff --git a/exploitation_zone/dim_product.py b/exploitation_zone/dim_product.py new file mode 100644 index 0000000..b06e038 --- /dev/null +++ b/exploitation_zone/dim_product.py @@ -0,0 +1,58 @@ +import logging +import os +import configparser +import json +from pyspark.sql import SparkSession +from datetime import datetime +from pyspark.sql.window import Window +from pyspark.sql.functions import row_number + + +# Configure logging +logging.basicConfig(level=logging.INFO) # Set log level to INFO + +# Create logger object +logger = logging.getLogger() + +# Get the path to the parent parent directory +root_dir = os.path.abspath(os.path.join(os.getcwd())) + +# Specify the path to config file +config_file_path = os.path.join(root_dir, "config.ini") +config = configparser.ConfigParser() +config.read(config_file_path) + +config_file_path_json = os.path.join(root_dir, "config.json") +with open(config_file_path_json) as f: + config_json = json.load(f) + + +if __name__ == "__main__": + gcs_config = config["GCS"]["credentials_path"] + raw_bucket_name = config["GCS"]["raw_bucket_name"] + formatted_bucket_name = config["GCS"]["formatted_bucket_name"] + exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"] + + spark = SparkSession.builder \ + .appName("Product Dimension table creation") \ + .config("spark.driver.host", "127.0.0.1") \ + .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ + .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ + .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \ + .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \ + .getOrCreate() + + # Read the Parquet file into a DataFrame from GCS Raw Bucket + customer_purchase_df = spark.read.parquet('gs://'+raw_bucket_name+'/customer_purchase*.parquet') + + + logger.info('-----------------------------------------------------') + logger.info("Creating product dimension table") + + product_df = customer_purchase_df.select("product_name").dropDuplicates() + + window_spec = Window.orderBy("product_name") + product_df = product_df.withColumn("product_id", row_number().over(window_spec)).select("product_id","product_name") + + product_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_product_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet') + \ No newline at end of file diff --git a/exploitation_zone/dim_sp_location.py b/exploitation_zone/dim_sp_location.py new file mode 100644 index 0000000..87969d6 --- /dev/null +++ b/exploitation_zone/dim_sp_location.py @@ -0,0 +1,67 @@ +import logging +import os +import configparser +import json +from pyspark.sql import SparkSession +from datetime import datetime +from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace + +# Configure logging +logging.basicConfig(level=logging.INFO) # Set log level to INFO + +# Create logger object +logger = logging.getLogger() + +# Get base directory +root_dir = os.path.abspath(os.path.join(os.getcwd())) + +# Specify the path to config file +config_file_path = os.path.join(root_dir, "config.ini") +config = configparser.ConfigParser() +config.read(config_file_path) + +config_file_path_json = os.path.join(root_dir, "config.json") +with open(config_file_path_json) as f: + config_json = json.load(f) + + +if __name__ == "__main__": + gcs_config = config["GCS"]["credentials_path"] + raw_bucket_name = config["GCS"]["raw_bucket_name"] + formatted_bucket_name = config["GCS"]["formatted_bucket_name"] + exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"] + + spark = SparkSession.builder \ + .appName("Supermarket Dimension table creation") \ + .config("spark.driver.host", "127.0.0.1") \ + .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ + .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ + .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \ + .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \ + .getOrCreate() + + logger.info('-----------------------------------------------------') + logger.info("Creating supermarket dimension table") + + # Read the Parquet file into a DataFrame from GCS Bucket + supermarket_df = spark.read.parquet('gs://'+formatted_bucket_name+'/establishments_catalonia*.parquet') + + # Drop duplicates if present + supermarket_df = supermarket_df.dropDuplicates() + supermarket_df.show() + + supermarket_df = supermarket_df.withColumnRenamed("id","supermarket_id")\ + .withColumnRenamed("commercial_name","supermarket_name") + + loc_df = supermarket_df.select('full_address', "UTMx", "UTMy", "latitude", "longitude") + loc_df = loc_df.withColumn("location_id", monotonically_increasing_id()+7000) # To generate supermarket_id starting from id=7000 + + sp_loc_df = supermarket_df.join(loc_df, ['full_address', "UTMx", "UTMy", "latitude", "longitude"]) + + dim_supermarket = sp_loc_df.select("supermarket_id", "supermarket_name", "location_id") + dim_supermarket = dim_supermarket.dropDuplicates() + + dim_sp_location = sp_loc_df.select("location_id", "full_address", "UTMx", "UTMy", "latitude", "longitude") + dim_sp_location = dim_sp_location.dropDuplicates() + + supermarket_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_supermarket_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet') \ No newline at end of file diff --git a/formatted_zone/estimate_expiry_date.py b/formatted_zone/estimate_expiry_date.py index b814e70..43e1c5f 100644 --- a/formatted_zone/estimate_expiry_date.py +++ b/formatted_zone/estimate_expiry_date.py @@ -73,26 +73,6 @@ def count_tokens(s1, s2): return len(s1_tokens.intersection(s2_tokens)) -@udf(IntegerType()) -def spacy_match_score(spacy_filter, spacy_threshold, s1, s2): - s1 = nlp(s1) - s2 = nlp(s2) - - s1_verbs = " ".join([token.lemma_ for token in s1 if token.pos_ == spacy_filter]) - s2_verbs = " ".join([token.lemma_ for token in s1 if token.pos_ == spacy_filter]) - - doc1 = nlp(s1_verbs) - doc2 = nlp(s2_verbs) - - # Calculate similarity - score = doc1.similarity(doc2) - - if score < spacy_threshold: - score = 0 - return score - - - if __name__ == "__main__": gcs_config = config["GCS"]["credentials_path"] raw_bucket_name = config["GCS"]["raw_bucket_name"] @@ -102,14 +82,13 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2): fuzzy_threshold = config_json["product_matching"]["fuzzy_matching"]["threshold"] spark = SparkSession.builder \ - .appName("Estimate Expiry Date") \ - .config("spark.driver.host", "127.0.0.1") \ - .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ - .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ - .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \ - .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \ - .getOrCreate() - + .appName("GCS Files Read") \ + .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2") \ + .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ + .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ + .config("google.cloud.auth.service.account.json.keyfile", "C:\\UPC\\BDM\\Project\\VBP_Joint_Project\\gcs_config.json") \ + .getOrCreate() + # Specify the path to the Parquet file cust_purachase = f'gs://{formatted_bucket_name}/customer_purchase*' @@ -125,6 +104,8 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2): customer_purachase_df = cust_purachase_df.join(cust_email_df, 'customer_id', 'inner') customer_purachase_df = customer_purachase_df.select("id","customer_id","customer_name","email_id","product_name","unit_price","quantity","purchase_date") + + # customer_purachase_df = customer_purachase_df.filter(customer_purachase_df["product_name"] == "Natureland Organics Chana Besan (500 g)") customer_purachase_df = customer_purachase_df.withColumn("original_product_name", customer_purachase_df["product_name"]) @@ -147,10 +128,11 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2): filtered_df = filtered_df.withColumn("score", fuzzy_match_score(lit(fuzzy_score_calc_method), lit(fuzzy_threshold), filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"])) - filtered_df = filtered_df.filter(filtered_df.score != 0) filtered_df = filtered_df.withColumn("token_count", count_tokens(filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"])) + # filtered_df = filtered_df.withColumn("score", spacy_match_score(filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"])) + windowSpec = Window.partitionBy("id") \ .orderBy(filtered_df["score"].desc(), filtered_df["token_count"].desc()) @@ -158,10 +140,14 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2): df_with_rn = filtered_df.withColumn("row_number", row_number().over(windowSpec)) # Filter rows where row number is 1 (which corresponds to the row with the maximum fuzzy score for each product) - df_with_rn = df_with_rn.filter(df_with_rn["row_number"] == 1).drop("row_number", "product_name", "token_count") + df_with_rn = df_with_rn.filter(((df_with_rn["score"] != 0) & (df_with_rn["row_number"] == 1))).drop("row_number", "product_name", "token_count") df_with_rn = df_with_rn.withColumnRenamed("original_product_name", "product_name") df_with_rn = df_with_rn.withColumn("expected_expiry_date", expr("date_add(purchase_date, cast(ceil(avg_expiry_days/2) AS INT))")) - # df_with_rn.write.mode('overwrite').parquet("./data/formatted_zone/purchases_nearing_expiry") + # debug_df = df_with_rn.select("product_name","product_in_avg_expiry_file","avg_expiry_days") + # debug_df.write.csv('./data/formatted_zone/expiry_date_accuracy') + + cnt = df_with_rn.count() + print(cnt) df_with_rn.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/purchases_nearing_expiry') \ No newline at end of file