From e84c6e2faa961be1792098f7f40b7af10e7d9beb Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Fri, 24 May 2024 23:19:46 +0200 Subject: [PATCH] added fact table for business inventory --- exploitation_zone/fact_business_inventory.py | 53 ++++++++++++++++++-- exploitation_zone/schema.txt | 11 ++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/exploitation_zone/fact_business_inventory.py b/exploitation_zone/fact_business_inventory.py index 98f737c..6064b10 100644 --- a/exploitation_zone/fact_business_inventory.py +++ b/exploitation_zone/fact_business_inventory.py @@ -4,7 +4,9 @@ import json from pyspark.sql import SparkSession from datetime import datetime -from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit +from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit, to_date +from pyspark.sql.functions import row_number +from pyspark.sql.window import Window # Configure logging logging.basicConfig(level=logging.INFO) # Set log level to INFO @@ -30,9 +32,11 @@ raw_bucket_name = config["GCS"]["raw_bucket_name"] formatted_bucket_name = config["GCS"]["formatted_bucket_name"] exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"] + project_id = config["BIGQUERY"]["project_id"] + dataset_id = config["BIGQUERY"]["dataset_id"] spark = SparkSession.builder \ - .appName("Supermarket Dimension table creation") \ + .appName("Supermarket Fact table creation") \ .config("spark.driver.host", "127.0.0.1") \ .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \ .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \ @@ -41,9 +45,48 @@ .getOrCreate() logger.info('-----------------------------------------------------') - logger.info("Creating business_inventory fact table") + logger.info("Creating business inventory fact table") + + exploitation_zone_parquet_file_path = os.path.join(root_dir, 'data', 'exploitation_zone') # Read the Parquet file into a DataFrame from GCS Bucket - dim_date_df = spark.read.parquet(os.path.join(root_dir,'data','exploitation_zone','dim_date.parquet')) + supermarket_inventory_df = spark.read.parquet('gs://'+formatted_bucket_name+'/supermarket_inventory*.parquet') # note that I'm using this and not supermarket_products. + + + dim_product_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_product.parquet')) + dim_supermarket_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_supermarket.parquet')) + + # supermarket_inventory_df.show() + # dim_product_df.show() + # dim_supermarket_df.show() + + supermarket_inventory_df = supermarket_inventory_df.withColumnRenamed("store_id", "supermarket_id") + fact_business_inventory_df = supermarket_inventory_df.join(dim_supermarket_df, 'supermarket_id', 'inner').select(supermarket_inventory_df['*'], dim_supermarket_df['location_id']) + + fact_business_inventory_df = fact_business_inventory_df.drop("product_id") # removing product_id column because it should come from dimension product + + fact_business_inventory_df = fact_business_inventory_df.join(dim_product_df, 'product_name', 'inner').select(fact_business_inventory_df['*'], dim_product_df['product_id']) + fact_business_inventory_df = fact_business_inventory_df.withColumn("created_on",lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # Add created_on + + # creating static expiry date id for now + fact_business_inventory_df = fact_business_inventory_df.withColumn("expiry_date_id", lit('20240524')) + + window_spec = Window.orderBy("product_name") + fact_business_inventory_df = fact_business_inventory_df.withColumn("supermarket_inventory_id", row_number().over(window_spec)) + + fact_business_inventory_df = fact_business_inventory_df.select("supermarket_inventory_id", "supermarket_id", "location_id", "product_id", "price", "quantity", "expiry_date_id", "created_on") + # Note : Expiry Date ID is missing since its not in the data yet. + + # fact_business_inventory_df.show() + + fact_business_inventory_df.printSchema() + + fact_business_inventory_df.write \ + .format('bigquery') \ + .option('table', f'{project_id}:{dataset_id}.fact_business_inventory') \ + .option('temporaryGcsBucket', raw_bucket_name) \ + .mode('overwrite') \ + .save() + + fact_business_inventory_df.write.mode('overwrite').parquet(os.path.join(exploitation_zone_parquet_file_path, 'fact_business_inventory.parquet')) - dim_date_df.show() diff --git a/exploitation_zone/schema.txt b/exploitation_zone/schema.txt index b6eb87a..27a99ed 100644 --- a/exploitation_zone/schema.txt +++ b/exploitation_zone/schema.txt @@ -61,4 +61,15 @@ root |-- unit_price: string (nullable = true) |-- quantity: long (nullable = true) |-- purchase_date_id: string (nullable = true) + |-- created_on: string (nullable = false) + +fact_business_inventory +root + |-- supermarket_inventory_id: integer (nullable = false) + |-- supermarket_id: string (nullable = true) + |-- location_id: long (nullable = true) + |-- product_id: integer (nullable = true) + |-- price: double (nullable = true) + |-- quantity: integer (nullable = true) + |-- expiry_date_id: string (nullable = false) |-- created_on: string (nullable = false) \ No newline at end of file