Skip to content

Commit

Permalink
added fact table for customer inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
withoutwaxaryan committed May 23, 2024
1 parent 73f7115 commit 16abf40
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
5 changes: 5 additions & 0 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ local_directory = /home/pce/Documents/VBP_Joint_Project-main/data/raw

[SLACK]
webhook_url = https://hooks.slack.com/services/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


[BIGQUERY]
project_id = 'formal-atrium-418823'
dataset_id = 'spicyquery'
33 changes: 29 additions & 4 deletions exploitation_zone/fact_customer_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]
project_id = config["BIGQUERY"]["project_id"]
dataset_id = config["BIGQUERY"]["dataset_id"]

spark = SparkSession.builder \
.appName("Supermarket Dimension table creation") \
.appName("Customer Fact table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
Expand All @@ -41,11 +43,34 @@
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating customer_inventory fact table")
logger.info("Creating customer inventory fact table")

exploitation_zone_parquet_file_path = os.path.join(root_dir, 'data', 'exploitation_zone')
# Read the Parquet file into a DataFrame from GCS Bucket
cust_purchase_df = spark.read.parquet('gs://'+formatted_bucket_name+'/customer_purchase*.parquet')
dim_product_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_product.parquet'))
dim_customer_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_customer.parquet'))

# dim_date_df = spark.read.parquet(os.path.join(root_dir,'data','exploitation_zone','dim_date.parquet'))

cust_purchase_df.show()
fact_customer_inventory_df = cust_purchase_df.join(dim_product_df, 'product_name', 'inner').select(cust_purchase_df['id'],
cust_purchase_df['customer_id'], cust_purchase_df['unit_price'],
cust_purchase_df['quantity'], cust_purchase_df['purchase_date'],
dim_product_df['product_id'])
fact_customer_inventory_df = fact_customer_inventory_df.join(dim_customer_df, 'customer_id', 'inner').select(fact_customer_inventory_df['*'], dim_customer_df['location_id'])
fact_customer_inventory_df = fact_customer_inventory_df.withColumn("purchase_date_id", regexp_replace("purchase_date", "-", "")).drop('purchase_date')
fact_customer_inventory_df = fact_customer_inventory_df.withColumnRenamed("id", "inventory_id")
fact_customer_inventory_df = fact_customer_inventory_df.withColumn("inventory_id", col("inventory_id") + 1)

fact_customer_inventory_df = fact_customer_inventory_df.withColumn("created_on",lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # Add created_on
fact_customer_inventory_df = fact_customer_inventory_df.select("inventory_id", "customer_id", "location_id", "product_id", "unit_price", "quantity", "purchase_date_id", "created_on")

fact_customer_inventory_df.printSchema()

fact_customer_inventory_df.write \
.format('bigquery') \
.option('table', f'{project_id}:{dataset_id}.fact_customer_inventory') \
.option('temporaryGcsBucket', raw_bucket_name) \
.mode('overwrite') \
.save()

fact_customer_inventory_df.write.mode('overwrite').parquet(os.path.join(exploitation_zone_parquet_file_path, 'fact_customer_inventory.parquet'))
11 changes: 11 additions & 0 deletions exploitation_zone/schema.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,15 @@ root
|-- UTMy: double (nullable = true)
|-- latitude: float (nullable = true)
|-- longitude: float (nullable = true)
|-- created_on: string (nullable = false)

fact_customer_inventory
root
|-- inventory_id: long (nullable = true)
|-- customer_id: long (nullable = true)
|-- location_id: long (nullable = true)
|-- product_id: integer (nullable = true)
|-- unit_price: string (nullable = true)
|-- quantity: long (nullable = true)
|-- purchase_date_id: string (nullable = true)
|-- created_on: string (nullable = false)

0 comments on commit 16abf40

Please sign in to comment.