-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathspark_processing.py
129 lines (106 loc) · 4.4 KB
/
spark_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Initialize logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")
def initialize_spark_session(app_name, access_key, secret_key):
"""
Initialize the Spark Session with provided configurations.
:param app_name: Name of the spark application.
:param access_key: Access key for S3.
:param secret_key: Secret key for S3.
:return: Spark session object or None if there's an error.
"""
try:
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger.info('Spark session initialized successfully')
return spark
except Exception as e:
logger.error(f"Spark session initialization failed. Error: {e}")
return None
def get_streaming_dataframe(spark, brokers, topic):
"""
Get a streaming dataframe from Kafka.
:param spark: Initialized Spark session.
:param brokers: Comma-separated list of Kafka brokers.
:param topic: Kafka topic to subscribe to.
:return: Dataframe object or None if there's an error.
"""
try:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.option("delimiter", ",") \
.option("startingOffsets", "earliest") \
.load()
logger.info("Streaming dataframe fetched successfully")
return df
except Exception as e:
logger.warning(f"Failed to fetch streaming dataframe. Error: {e}")
return None
def transform_streaming_data(df):
"""
Transform the initial dataframe to get the final structure.
:param df: Initial dataframe with raw data.
:return: Transformed dataframe.
"""
schema = StructType([
StructField("full_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("location", StringType(), False),
StructField("city", StringType(), False),
StructField("country", StringType(), False),
StructField("postcode", IntegerType(), False),
StructField("latitude", FloatType(), False),
StructField("longitude", FloatType(), False),
StructField("email", StringType(), False)
])
transformed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
return transformed_df
def initiate_streaming_to_bucket(df, path, checkpoint_location):
"""
Start streaming the transformed data to the specified S3 bucket in parquet format.
:param df: Transformed dataframe.
:param path: S3 bucket path.
:param checkpoint_location: Checkpoint location for streaming.
:return: None
"""
logger.info("Initiating streaming process...")
stream_query = (df.writeStream
.format("parquet")
.outputMode("append")
.option("path", path)
.option("checkpointLocation", checkpoint_location)
.start())
stream_query.awaitTermination()
def main():
app_name = "SparkStructuredStreamingToS3"
access_key = "ENTER_YOUR_ACCESS_KEY"
secret_key = "ENTER_YOUR_SECRET_KEY"
brokers = "kafka_broker_1:19092,kafka_broker_2:19093,kafka_broker_3:19094"
topic = "names_topic"
path = "BUCKET_PATH"
checkpoint_location = "CHECKPOINT_LOCATION"
spark = initialize_spark_session(app_name, access_key, secret_key)
if spark:
df = get_streaming_dataframe(spark, brokers, topic)
if df:
transformed_df = transform_streaming_data(df)
initiate_streaming_to_bucket(transformed_df, path, checkpoint_location)
# Execute the main function if this script is run as the main module
if __name__ == '__main__':
main()