Skip to content

Commit

Permalink
Inventory fact file added
Browse files Browse the repository at this point in the history
  • Loading branch information
SonyShrestha committed May 23, 2024
1 parent 1c956f7 commit ea4596f
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
49 changes: 49 additions & 0 deletions exploitation_zone/fact_business_inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Supermarket Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating business_inventory fact table")

# Read the Parquet file into a DataFrame from GCS Bucket
dim_date_df = spark.read.parquet(os.path.join(root_dir,'data','exploitation_zone','dim_date.parquet'))

dim_date_df.show()
51 changes: 51 additions & 0 deletions exploitation_zone/fact_customer_inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]

spark = SparkSession.builder \
.appName("Supermarket Dimension table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

logger.info('-----------------------------------------------------')
logger.info("Creating customer_inventory fact table")

# Read the Parquet file into a DataFrame from GCS Bucket
cust_purchase_df = spark.read.parquet('gs://'+formatted_bucket_name+'/customer_purchase*.parquet')

# dim_date_df = spark.read.parquet(os.path.join(root_dir,'data','exploitation_zone','dim_date.parquet'))

cust_purchase_df.show()

0 comments on commit ea4596f

Please sign in to comment.