Skip to content

Commit

Permalink
added fact table for business inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
withoutwaxaryan committed May 24, 2024
1 parent 9ee49c4 commit e84c6e2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
53 changes: 48 additions & 5 deletions exploitation_zone/fact_business_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit, to_date
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO
Expand All @@ -30,9 +32,11 @@
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]
project_id = config["BIGQUERY"]["project_id"]
dataset_id = config["BIGQUERY"]["dataset_id"]

spark = SparkSession.builder \
.appName("Supermarket Dimension table creation") \
.appName("Supermarket Fact table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
Expand All @@ -41,9 +45,48 @@
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating business_inventory fact table")
logger.info("Creating business inventory fact table")

exploitation_zone_parquet_file_path = os.path.join(root_dir, 'data', 'exploitation_zone')

# Read the Parquet file into a DataFrame from GCS Bucket
dim_date_df = spark.read.parquet(os.path.join(root_dir,'data','exploitation_zone','dim_date.parquet'))
supermarket_inventory_df = spark.read.parquet('gs://'+formatted_bucket_name+'/supermarket_inventory*.parquet') # note that I'm using this and not supermarket_products.


dim_product_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_product.parquet'))
dim_supermarket_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_supermarket.parquet'))

# supermarket_inventory_df.show()
# dim_product_df.show()
# dim_supermarket_df.show()

supermarket_inventory_df = supermarket_inventory_df.withColumnRenamed("store_id", "supermarket_id")
fact_business_inventory_df = supermarket_inventory_df.join(dim_supermarket_df, 'supermarket_id', 'inner').select(supermarket_inventory_df['*'], dim_supermarket_df['location_id'])

fact_business_inventory_df = fact_business_inventory_df.drop("product_id") # removing product_id column because it should come from dimension product

fact_business_inventory_df = fact_business_inventory_df.join(dim_product_df, 'product_name', 'inner').select(fact_business_inventory_df['*'], dim_product_df['product_id'])
fact_business_inventory_df = fact_business_inventory_df.withColumn("created_on",lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # Add created_on

# creating static expiry date id for now
fact_business_inventory_df = fact_business_inventory_df.withColumn("expiry_date_id", lit('20240524'))

window_spec = Window.orderBy("product_name")
fact_business_inventory_df = fact_business_inventory_df.withColumn("supermarket_inventory_id", row_number().over(window_spec))

fact_business_inventory_df = fact_business_inventory_df.select("supermarket_inventory_id", "supermarket_id", "location_id", "product_id", "price", "quantity", "expiry_date_id", "created_on")
# Note : Expiry Date ID is missing since its not in the data yet.

# fact_business_inventory_df.show()

fact_business_inventory_df.printSchema()

fact_business_inventory_df.write \
.format('bigquery') \
.option('table', f'{project_id}:{dataset_id}.fact_business_inventory') \
.option('temporaryGcsBucket', raw_bucket_name) \
.mode('overwrite') \
.save()

fact_business_inventory_df.write.mode('overwrite').parquet(os.path.join(exploitation_zone_parquet_file_path, 'fact_business_inventory.parquet'))

dim_date_df.show()
11 changes: 11 additions & 0 deletions exploitation_zone/schema.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,15 @@ root
|-- unit_price: string (nullable = true)
|-- quantity: long (nullable = true)
|-- purchase_date_id: string (nullable = true)
|-- created_on: string (nullable = false)

fact_business_inventory
root
|-- supermarket_inventory_id: integer (nullable = false)
|-- supermarket_id: string (nullable = true)
|-- location_id: long (nullable = true)
|-- product_id: integer (nullable = true)
|-- price: double (nullable = true)
|-- quantity: integer (nullable = true)
|-- expiry_date_id: string (nullable = false)
|-- created_on: string (nullable = false)

0 comments on commit e84c6e2

Please sign in to comment.