-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload_rides.py
91 lines (71 loc) · 3.93 KB
/
upload_rides.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import psycopg2
def update_fact_rides():
read_conn = psycopg2.connect(dbname='taxi', user='etl_tech_user',
password='etl_tech_user_password', host='de-edu-db.chronosavant.ru')
read_cursor = read_conn.cursor()
write_conn = psycopg2.connect(dbname='dwh', user='dwh_krasnoyarsk',
password='dwh_krasnoyarsk_uBPaXNSx', host='de-edu-db.chronosavant.ru')
write_cursor = write_conn.cursor()
# выбор только новых записей
with open('last_read_line_ride.txt', 'r') as f:
last_read_line_num = f.readline()
if last_read_line_num == '':
last_read_line_num = 0
else:
last_read_line_num = int(last_read_line_num)
read_cursor.execute(f"""SELECT point_from, point_to, distance, price, client_phone, ry.car_plate_num as car_plate_num,\
ry.dt as arrival_dt, NULL as start_dt, cl.dt as end_dt, cl.movement_id
FROM (SELECT * FROM main.movement WHERE event = 'READY') AS ry
INNER JOIN (SELECT * FROM main.movement WHERE event = 'CANCEL') AS cl ON ry.ride = cl.ride
INNER JOIN main.rides as rides ON ry.ride = rides.ride_id
WHERE cl.movement_id > {last_read_line_num};""", )
rides = read_cursor.fetchall()
read_cursor.execute(f"""SELECT point_from, point_to, distance, price, client_phone, ry.car_plate_num as car_plate_num,\
ry.dt as arrival_dt, bg.dt as start_dt, ed.dt as end_dt, ed.movement_id
FROM (SELECT * FROM main.movement WHERE event = 'READY') AS ry
INNER JOIN (SELECT * FROM main.movement WHERE event = 'BEGIN') AS bg ON ry.ride = bg.ride
INNER JOIN (SELECT * FROM main.movement WHERE event = 'END') as ed ON bg.ride = ed.ride
INNER JOIN main.rides as rides ON ry.ride = rides.ride_id
WHERE ed.movement_id > {last_read_line_num};""")
rides += read_cursor.fetchall()
#получение маскимального id записи
write_cursor.execute("SELECT MAX(ride_id) FROM fact_rides")
res = write_cursor.fetchall()
if res != [(None,)]:
ride_id = res[-1][0] + 1
else:
ride_id = 0
for ride in rides:
# print(ride)
point_from_txt = ride[0]
point_to_txt = ride[1]
distance_val = ride[2]
price_amt = ride[3]
client_phone_num = ride[4]
car_plate_num = ride[5]
ride_arrival_dt = ride[6]
ride_start_dt = ride[7]
ride_end_dt = ride[8]
last_read_line_num = max(ride[9], last_read_line_num)
write_cursor.execute(f"SELECT driver_pers_num FROM fact_waybills WHERE car_plate_num = '{car_plate_num}' AND work_start_dt <= '{ride_arrival_dt}' AND (work_end_dt >= '{ride_end_dt}' OR work_end_dt IS NULL);")
res = write_cursor.fetchall()
if res == []:
driver_pers_num = -1
else:
driver_pers_num = int(res[-1][0])
# print((ride_id, point_from_txt, point_to_txt, distance_val, price_amt, client_phone_num,\
# driver_pers_num, car_plate_num, ride_arrival_dt, ride_start_dt, ride_end_dt))
write_cursor.execute('INSERT INTO fact_rides VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);',
(ride_id, point_from_txt, point_to_txt, distance_val, price_amt, client_phone_num,\
driver_pers_num, car_plate_num, ride_arrival_dt, ride_start_dt, ride_end_dt))
write_conn.commit()
ride_id += 1
with open('last_read_line_ride.txt', 'w') as f:
f.write(str(last_read_line_num))
write_cursor.close()
write_conn.close()
read_cursor.close()
read_conn.close()
return 'OK'
if __name__ == '__main__':
update_fact_rides()