-
Notifications
You must be signed in to change notification settings - Fork 103
/
Copy pathspark_stream.py
158 lines (123 loc) · 5.12 KB
/
spark_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
print("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
print("Table created successfully!")
def insert_data(session, **kwargs):
print("inserting data...")
user_id = kwargs.get('id')
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (user_id, first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logging.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logging.error(f'could not insert data due to {e}')
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
.config('spark.cassandra.connection.host', 'localhost') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection():
try:
# connecting to the cassandra cluster
cluster = Cluster(['localhost'])
cas_session = cluster.connect()
return cas_session
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
if __name__ == "__main__":
# create spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# connect to kafka with spark connection
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
logging.info("Streaming is being started...")
streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
streaming_query.awaitTermination()