-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-mysql.py
44 lines (37 loc) · 1.47 KB
/
spark-mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import configparser
from pyspark.sql import SparkSession, functions as F
import time
spark = (SparkSession.builder
.appName("MySQL Conn")
# mysql java connector
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.32")
.master("local[2]")
.getOrCreate())
config = configparser.RawConfigParser()
# Mysql connection information
config.read('./db_conn')
user_name = config.get('DB', 'user_name')
password = config.get('DB', 'password')
db_ip = config.get('DB', 'db_ip')
jdbcUrl = f"jdbc:mysql://{db_ip}:3306/dataops?user={user_name}&password={password}"
# Save dataset to local
# wget -O ~/datasets/customers_train.csv <url>
# url --> https://raw.githubusercontent.com/erkansirin78/datasets/master/deltalake_streaming_demo/customers_train.csv</pre>
# This dataset consist of 1000 rows
df = (spark.read.option("header", True)
.option("inferSchema", True)
.csv("file:///home/train/datasets/customers_train.csv"))
# This is only a simulation. Be careful when you use collect, dataset may not fit into memory of driver
start = 0
while start < 500:
time.sleep(2)
end = start + 5
df_part_list = df.collect()[start:end]
df_part = spark.createDataFrame(df_part_list)
df_part.show()
df_part.write.jdbc(url=jdbcUrl,
table="demo",
mode="append",
# mysql driver
properties={"driver": "com.mysql.cj.jdbc.Driver"})
start += 5