Skip to content

Commit

Permalink
Dimension table for DW
Browse files Browse the repository at this point in the history
  • Loading branch information
SonyShrestha committed May 21, 2024
1 parent 62ace4a commit 97cd9c2
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 31 deletions.
74 changes: 74 additions & 0 deletions exploitation_zone/dim_cust_location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import os
import configparser
import json
from datetime import datetime
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace


# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Customer Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating customer dimension table")

# Read the Parquet file into a DataFrame from GCS Bucket
customers_df = spark.read.parquet('gs://'+formatted_bucket_name+'/customers*.parquet')
location_df = spark.read.parquet('gs://'+raw_bucket_name+'/location*.parquet')

cust_location_df = spark.read.parquet('gs://'+raw_bucket_name+'/customer_location*.parquet')
cust_location_df = cust_location_df.withColumnRenamed("location_id","customer_location").withColumnRenamed("customer_id","customer")

df1 = customers_df.join(cust_location_df, customers_df.customer_id==cust_location_df.customer,"inner")
df2 = df1.join(location_df, df1.customer_location==location_df.location_id, "inner")

dim_cust = df2.select("customer_id", "customer_name", "email_id", "location_id")

dim_cust.dropDuplicates(dim_cust)

dim_cust_location = (
df2
.select("location_id", "postal_code", "place_name", "latitude", "longitude")
.groupBy("location_id")
.agg(
F.min("postal_code").alias("postal_code"),
F.min("place_name").alias("place_name"),
F.min("latitude").alias("latitude"),
F.min("longitude").alias("longitude")
)
)

dim_cust.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_customer_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet')
dim_cust_location.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_customer_location_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet')
71 changes: 71 additions & 0 deletions exploitation_zone/dim_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import json
import configparser
import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import min, max, explode, sequence, col, date_format, expr, to_date

from pyspark.sql.types import IntegerType

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config files
config_file_path = os.path.join(root_dir, "config.ini")
config_file_path_json = os.path.join(root_dir, "config.json")

# Load configuration files
config = configparser.ConfigParser()
config.read(config_file_path)

with open(config_file_path_json) as f:
config_json = json.load(f)

def generate_dates(start_date_str, end_date_str):
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
date_list = []
while start_date <= end_date:
date_list.append(start_date.strftime('%Y-%m-%d'))
start_date += timedelta(days=1)
return date_list


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Date Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating date dimension table")

customers_df = spark.read.parquet(f'gs://{raw_bucket_name}/customer_purchase_*')
min_max_dates = customers_df.agg(
F.min("purchase_date").alias("min_purchase_date"),
F.max("purchase_date").alias("max_purchase_date")
)

date_df = min_max_dates.select(explode(sequence(to_date(col("min_purchase_date")), to_date(col("max_purchase_date")), expr("INTERVAL 1 DAY")))).alias("date")
date_df = date_df.withColumn("date_id", F.date_format(F.col("col"), "yyyyMMdd").cast(IntegerType()))
date_df = date_df.withColumn("day", F.date_format(F.col("col"), "d").cast(IntegerType()))
date_df = date_df.withColumn("month", F.date_format(F.col("col"), "M").cast(IntegerType()))
date_df = date_df.withColumn("quarter", ((F.date_format(F.col("col"), "M").cast(IntegerType()) - 1) / 3 + 1).cast(IntegerType()))
date_df = date_df.withColumn("year", F.date_format(F.col("col"), "yyyy").cast(IntegerType()))
date_df = date_df.withColumnRenamed("col","date").select("date_id","date","year","month","day")

date_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_date_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet')
58 changes: 58 additions & 0 deletions exploitation_zone/dim_product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get the path to the parent parent directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Product Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

# Read the Parquet file into a DataFrame from GCS Raw Bucket
customer_purchase_df = spark.read.parquet('gs://'+raw_bucket_name+'/customer_purchase*.parquet')


logger.info('-----------------------------------------------------')
logger.info("Creating product dimension table")

product_df = customer_purchase_df.select("product_name").dropDuplicates()

window_spec = Window.orderBy("product_name")
product_df = product_df.withColumn("product_id", row_number().over(window_spec)).select("product_id","product_name")

product_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_product_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet')

67 changes: 67 additions & 0 deletions exploitation_zone/dim_sp_location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Supermarket Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating supermarket dimension table")

# Read the Parquet file into a DataFrame from GCS Bucket
supermarket_df = spark.read.parquet('gs://'+formatted_bucket_name+'/establishments_catalonia*.parquet')

# Drop duplicates if present
supermarket_df = supermarket_df.dropDuplicates()
supermarket_df.show()

supermarket_df = supermarket_df.withColumnRenamed("id","supermarket_id")\
.withColumnRenamed("commercial_name","supermarket_name")

loc_df = supermarket_df.select('full_address', "UTMx", "UTMy", "latitude", "longitude")
loc_df = loc_df.withColumn("location_id", monotonically_increasing_id()+7000) # To generate supermarket_id starting from id=7000

sp_loc_df = supermarket_df.join(loc_df, ['full_address', "UTMx", "UTMy", "latitude", "longitude"])

dim_supermarket = sp_loc_df.select("supermarket_id", "supermarket_name", "location_id")
dim_supermarket = dim_supermarket.dropDuplicates()

dim_sp_location = sp_loc_df.select("location_id", "full_address", "UTMx", "UTMy", "latitude", "longitude")
dim_sp_location = dim_sp_location.dropDuplicates()

supermarket_df.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/dim_supermarket_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.parquet')
48 changes: 17 additions & 31 deletions formatted_zone/estimate_expiry_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,6 @@ def count_tokens(s1, s2):
return len(s1_tokens.intersection(s2_tokens))


@udf(IntegerType())
def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):
s1 = nlp(s1)
s2 = nlp(s2)

s1_verbs = " ".join([token.lemma_ for token in s1 if token.pos_ == spacy_filter])
s2_verbs = " ".join([token.lemma_ for token in s1 if token.pos_ == spacy_filter])

doc1 = nlp(s1_verbs)
doc2 = nlp(s2_verbs)

# Calculate similarity
score = doc1.similarity(doc2)

if score < spacy_threshold:
score = 0
return score



if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
Expand All @@ -102,14 +82,13 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):
fuzzy_threshold = config_json["product_matching"]["fuzzy_matching"]["threshold"]

spark = SparkSession.builder \
.appName("Estimate Expiry Date") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

.appName("GCS Files Read") \
.config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2") \
.config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("google.cloud.auth.service.account.json.keyfile", "C:\\UPC\\BDM\\Project\\VBP_Joint_Project\\gcs_config.json") \
.getOrCreate()

# Specify the path to the Parquet file
cust_purachase = f'gs://{formatted_bucket_name}/customer_purchase*'

Expand All @@ -125,6 +104,8 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):

customer_purachase_df = cust_purachase_df.join(cust_email_df, 'customer_id', 'inner')
customer_purachase_df = customer_purachase_df.select("id","customer_id","customer_name","email_id","product_name","unit_price","quantity","purchase_date")

# customer_purachase_df = customer_purachase_df.filter(customer_purachase_df["product_name"] == "Natureland Organics Chana Besan (500 g)")

customer_purachase_df = customer_purachase_df.withColumn("original_product_name", customer_purachase_df["product_name"])

Expand All @@ -147,21 +128,26 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):

filtered_df = filtered_df.withColumn("score", fuzzy_match_score(lit(fuzzy_score_calc_method), lit(fuzzy_threshold), filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))

filtered_df = filtered_df.filter(filtered_df.score != 0)

filtered_df = filtered_df.withColumn("token_count", count_tokens(filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))

# filtered_df = filtered_df.withColumn("score", spacy_match_score(filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))

windowSpec = Window.partitionBy("id") \
.orderBy(filtered_df["score"].desc(), filtered_df["token_count"].desc())

# Add a row number column
df_with_rn = filtered_df.withColumn("row_number", row_number().over(windowSpec))

# Filter rows where row number is 1 (which corresponds to the row with the maximum fuzzy score for each product)
df_with_rn = df_with_rn.filter(df_with_rn["row_number"] == 1).drop("row_number", "product_name", "token_count")
df_with_rn = df_with_rn.filter(((df_with_rn["score"] != 0) & (df_with_rn["row_number"] == 1))).drop("row_number", "product_name", "token_count")
df_with_rn = df_with_rn.withColumnRenamed("original_product_name", "product_name")

df_with_rn = df_with_rn.withColumn("expected_expiry_date", expr("date_add(purchase_date, cast(ceil(avg_expiry_days/2) AS INT))"))

# df_with_rn.write.mode('overwrite').parquet("./data/formatted_zone/purchases_nearing_expiry")
# debug_df = df_with_rn.select("product_name","product_in_avg_expiry_file","avg_expiry_days")
# debug_df.write.csv('./data/formatted_zone/expiry_date_accuracy')

cnt = df_with_rn.count()
print(cnt)
df_with_rn.write.mode('overwrite').parquet(f'gs://{formatted_bucket_name}/purchases_nearing_expiry')

0 comments on commit 97cd9c2

Please sign in to comment.