-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsales_processor.py
96 lines (78 loc) · 3.02 KB
/
sales_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def main():
# Create streaming environment
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
# create table environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)
# add kafka connector dependency
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.13.0.jar')
tbl_env.get_config()\
.get_configuration()\
.set_string("pipeline.jars", "file://{}".format(kafka_jar))
#######################################################################
# Create Kafka Source Table with DDL
#######################################################################
src_ddl = """
CREATE TABLE sales_usd (
seller_id VARCHAR,
amount_usd DOUBLE,
sale_ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sales-usd',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'sales-usd',
'format' = 'json'
)
"""
tbl_env.execute_sql(src_ddl)
# create and initiate loading of source Table
tbl = tbl_env.from_path('sales_usd')
print('\nSource Schema')
tbl.print_schema()
#####################################################################
# Define Tumbling Window Aggregate Calculation (Seller Sales Per Minute)
#####################################################################
sql = """
SELECT
seller_id,
TUMBLE_END(proctime, INTERVAL '10' SECONDS) AS window_end,
SUM(amount_usd) * 0.85 AS window_sales
FROM sales_usd
GROUP BY
TUMBLE(proctime, INTERVAL '10' SECONDS),
seller_id
"""
revenue_tbl = tbl_env.sql_query(sql)
print('\nProcess Sink Schema')
revenue_tbl.print_schema()
###############################################################
# Create Kafka Sink Table
###############################################################
sink_ddl = """
CREATE TABLE sales_euros (
seller_id VARCHAR,
window_end TIMESTAMP(3),
window_sales DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'sales-euros',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
"""
tbl_env.execute_sql(sink_ddl)
# write time windowed aggregations to sink table
revenue_tbl.execute_insert('sales_euros').wait()
tbl_env.execute('windowed-sales-euros')
if __name__ == '__main__':
main()