-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathimport_report.py
227 lines (195 loc) · 7.75 KB
/
import_report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
import argparse
import csv
import datetime
import glob
from typing import List, Optional, Callable, Dict, Tuple
from lib.config import open_config
from lib.driver_creator import DriverCreator
from lib.email_sender import EmailSender
from lib.group_site_manager import GroupSiteManager, clean_csv_tracking
from lib.objects_to_sheet import ObjectsToSheet
from lib.tracking import Tracking
from lib.tracking_output import TrackingOutput
from lib.tracking_uploader import TrackingUploader
config = open_config()
def get_group(address: str) -> Tuple[Optional[str], bool]:
address = address.upper()
for group in config['groups'].keys():
group_conf = config['groups'][group]
reconcile = bool(group_conf['reconcile']) if 'reconcile' in group_conf else True
group_keys = config['groups'][group]['keys']
if isinstance(group_keys, str):
group_keys = [group_keys]
for group_key in group_keys:
if str(group_key).upper() in address:
return group, reconcile
if len(address) > 3:
print(f"No group from address {address}:")
return None, True
def get_ship_date(ship_date_str: str) -> str:
for date_format in ['%m/%d/%Y', '%m/%d/%y']:
try:
ship_date = datetime.datetime.strptime(ship_date_str, date_format)
return ship_date.strftime('%Y-%m-%d')
except:
pass
try:
return (datetime.date(year=1899, day=29, month=12) +
datetime.timedelta(days=int(ship_date_str))).strftime('%Y-%m-%d')
except:
return 'n/a'
def from_amazon_row(row: Dict[str, str]) -> Optional[Tracking]:
tracking = clean_csv_tracking(row['Carrier Tracking #'])
orders = {row['Order ID'].upper()}
price_str = str(row.get('Shipment Subtotal',
'0')).replace(',', '').replace('$', '').replace('N/A', '0.0')
price = float(price_str) if price_str else 0.0
to_email = row["Account User Email"]
ship_date = get_ship_date(
row['Shipment Date']) if 'Shipment Date' in row else datetime.date.today().strftime(
'%Y-%m-%d')
group, reconcile = get_group(row['Shipping Address'])
if group is None:
return None
tracked_cost = 0.0
items = row.get("Title", '') + " Qty:" + str(row.get("Item Quantity", ''))
merchant = row.get('Merchant', 'Amazon')
return Tracking(
tracking,
group,
orders,
price,
to_email,
ship_date=ship_date,
tracked_cost=tracked_cost,
items=items,
merchant=merchant,
reconcile=reconcile)
def from_personal_row(row: Dict[str, str]) -> Optional[Tracking]:
tracking_col = row['Carrier Name & Tracking Number']
if not tracking_col:
return None
tracking = tracking_col.split('(')[1].replace(')', '')
orders = {row['Order ID'].upper()}
price_str = str(row['Subtotal']).replace(',', '').replace('$', '').replace('N/A', '0.0')
price = float(price_str) if price_str else 0.0
to_email = row['Ordering Customer Email']
ship_date = get_ship_date(str(row["Shipment Date"]))
street_1 = row['Shipping Address Street 1']
city = row['Shipping Address City']
state = row['Shipping Address State']
address = f"{street_1} {city}, {state}"
group, reconcile = get_group(address)
if group is None:
return None
tracked_cost = 0.0
items = price_str
merchant = 'Amazon'
return Tracking(
tracking,
group,
orders,
price,
to_email,
ship_date=ship_date,
tracked_cost=tracked_cost,
items=items,
merchant=merchant,
reconcile=reconcile)
def find_candidate(tracking, candidates) -> Optional[Tracking]:
for candidate in candidates:
if tracking.tracking_number == candidate.tracking_number:
return candidate
return None
def dedupe_trackings(trackings: List[Tracking]) -> List[Tracking]:
result = []
for tracking in trackings:
candidate = find_candidate(tracking, result)
if candidate:
candidate.order_ids = set(candidate.order_ids)
candidate.order_ids.update(tracking.order_ids)
if candidate.price:
candidate.price = float(candidate.price) + tracking.price
candidate.items += "," + tracking.items
else:
result.append(tracking)
return result
def get_required(prompt):
result = ""
while not result:
result = str(input(prompt)).strip()
return result
def read_trackings_from_file(file, from_row_fn: Callable[[Dict[str, str]],
Tracking]) -> List[Tracking]:
try:
with open(file, 'r') as f:
reader = csv.DictReader(f)
rows = [r for r in reader]
return [from_row_fn(row) for row in rows]
except:
print(f"Error in file {file}")
return []
def do_with_wait(driver, wait, old_wait, fn):
try:
driver.implicitly_wait(wait)
return fn()
finally:
driver.implicitly_wait(old_wait)
def main():
parser = argparse.ArgumentParser(description='Importing Amazon reports from CSV or Drive')
parser.add_argument("--personal", "-p", action="store_true", help="Use the personal CSV format")
parser.add_argument("globs", nargs="*")
args, _ = parser.parse_known_args()
from_row_function = from_personal_row if args.personal else from_amazon_row
all_trackings = []
if args.globs:
for glob_input in args.globs:
files = glob.glob(glob_input)
for file in files:
all_trackings.extend(read_trackings_from_file(file, from_row_function))
else:
sheet_id = get_required("Enter Google Sheet ID: ")
tab_name = get_required("Enter the name of the tab within the sheet: ")
objects_to_sheet = ObjectsToSheet()
all_trackings.extend(objects_to_sheet.download_from_sheet(from_amazon_row, sheet_id, tab_name))
if len(all_trackings) == 0:
print("Nothing to import; terminating.")
return
num_n_a_trackings = len(
[ignored for ignored in all_trackings if ignored and ignored.tracking_number == 'N/A'])
num_empty_trackings = len(
[ignored for ignored in all_trackings if ignored and ignored.tracking_number == ''])
print(f'Skipping {num_n_a_trackings} for N/A tracking column and '
f'{num_empty_trackings} for empty tracking column.')
all_trackings = [
tracking for tracking in all_trackings
if tracking and tracking.tracking_number != 'N/A' and tracking.tracking_number != ''
]
len_non_reconcilable_trackings = len([t for t in all_trackings if not t.reconcile])
print(f'Skipping {len_non_reconcilable_trackings} non-reconcilable trackings.')
all_trackings = [t for t in all_trackings if t.reconcile]
base_len_trackings = len(all_trackings)
all_trackings = dedupe_trackings(all_trackings)
print(f'Filtered {base_len_trackings - len(all_trackings)} duplicate trackings from the sheet.')
print('Uploading trackings to Sheets...')
tracking_uploader = TrackingUploader(config)
tracking_uploader.upload_trackings(all_trackings)
tracking_output = TrackingOutput(config)
trackings_before_save = {t.tracking_number for t in tracking_output.get_existing_trackings()}
print(f"Number of trackings before: {len(trackings_before_save)}.")
print(f"Number imported from report(s): {len(all_trackings)}.")
tracking_output.save_trackings(all_trackings)
trackings_after_save = {t.tracking_number: t for t in tracking_output.get_existing_trackings()}
print(f"Number of trackings after: {len(trackings_after_save)}.")
new_trackings = set(trackings_after_save.keys()).difference(trackings_before_save)
print(f"Number of new-to-us trackings: {len(new_trackings)}")
new_tracking_objects = [trackings_after_save[t] for t in new_trackings]
email_config = config['email']
email_sender = EmailSender(email_config)
email_sender.send_email(new_tracking_objects)
print("Uploading new trackings to the group(s)' site(s)...")
group_site_manager = GroupSiteManager(config, DriverCreator())
group_site_manager.upload(new_tracking_objects)
if __name__ == "__main__":
main()