-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfitbod2elastic.py
141 lines (116 loc) · 4.93 KB
/
fitbod2elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Get the CSV file that Fitbod can export via GMail API and index the data in elasticsearch.
See: https://www.fitbod.me/
"""
import os
import csv
import logging
import datetime
import argparse
from dateutil import parser
from elasticsearch import Elasticsearch
import gmail_api
def get_attachments(query, tgt_dir, delete_msg=False):
"""Returns a list of attachments that match the query. Saves files to tgt_dir.
Will overwrite files if they already exist.
Set delete_msg=True to remove the message after downloading the attachment.
"""
service = gmail_api.GetService('credentials.json')
# Call the Gmail API
filtered_msgs = gmail_api.ListMessagesMatchingQuery(service, 'me', query)
print("%s attachment(s) found from GMail and saved to disk."
%(len(filtered_msgs), ))
logging.debug(filtered_msgs)
attachments = []
for msg in filtered_msgs:
attachments.extend(gmail_api.GetAttachments(service, 'me', msg["id"], tgt_dir))
if delete_msg:
gmail_api.TrashMessage(service, 'me', msg["id"])
return attachments
def index_to_es(es, index, doc_type, body, doc_id):
"""Indexes the data in 'body' object to Elasticsearch.
Args:
es: elasticsearch instance
index: Index name on the ES server
doc_type: ES doc type
body: The data to be indexed
doc_id: The ID for ES to use
"""
ret = es.index(index=index, doc_type=doc_type, body=body, id=doc_id)
if ret["result"] != "created" and ret["result"] != "updated":
logging.error("Indexing returned %s, expected 'created' or 'updated'"
%(ret["result"]))
def csv_to_workout_obj(csv_file, nr_of_days):
"""A CSV is converted to a list of dictionairies."""
#First, we add an ID field.
with open(csv_file, "r") as f:
lines = f.readlines()
line_nr = 0
lines[0] = '"id","timestamp","exercise","sets","reps","weight","is_warmup","note"\n'
for line_nr in range(1, len(lines)):
lines[line_nr] = '"' + str(line_nr) + '",' + lines[line_nr]
with open(csv_file, "w") as f:
f.writelines(lines)
#Second, we read the CSV to a dictionary.
obj = []
with open(csv_file, "r") as f:
rd = csv.DictReader(f, delimiter=',')
for row in rd:
#Convert the date to a proper timestamp
row["timestamp"] = parser.parse(row["timestamp"])
days = datetime.timedelta(days=nr_of_days)
date_in_range = row["timestamp"] > datetime.datetime.now() - days
if nr_of_days == 0 or date_in_range:
formatted_row = {}
for field in row.items():
formatted_row[field[0]] = to_float(field[1])
# Add a calculated volume field
formatted_row["volume"] = formatted_row["weight"] * \
formatted_row["reps"]
obj.append(formatted_row)
logging.info("Found %s sets within the past %s days"
%(len(obj), nr_of_days, ))
return obj
def to_float(my_data):
"""Cast my_data to float if possible, return it unchanged if not."""
try:
return float(my_data)
except Exception:
return my_data
def main():
ES_INDEX = "fitbod-workouts"
ES_DOC_TYPE = "set"
FITBOD_MSG_QUERY = "fitbod workout data export has:attachment"
TMP_DIR = os.path.join(os.sep, "tmp", "fitbod2elastic")
parser = argparse.ArgumentParser(\
description='Get Fitbod CSV from GMail and index it in Elasticsearch')
parser.add_argument('-d', '--days',
help="number of days to index starting from today",
type=int, default=7)
parser.add_argument('-v', '--verbose',
help="verbose logging",
action="store_true")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
attachments = get_attachments(FITBOD_MSG_QUERY, TMP_DIR, delete_msg=True)
es = Elasticsearch([{'host': '127.0.0.1', 'port': 9201}])
for att in attachments:
logging.info("Indexing %s" %(att, ))
if not att.endswith(".csv"):
logging.warning("...skipped. Unknown file type. Expecting CSV.")
continue
# We can't distinguish similar sets performed on the same day.
# So we add row numbers to the CSV, the oldest set being number 1.
# The assumption is that old workout data will not be modified.
# If the history changes, re-index everything with nr_of_days=0
workout_data = csv_to_workout_obj(att, nr_of_days=args.days)
print("Indexing %s workout sets" %(len(workout_data), ))
for workout_set in workout_data:
index_to_es(es, ES_INDEX, ES_DOC_TYPE, workout_set,
workout_set["id"])
#Cleanup
for att in attachments:
logging.info("Removing %s from disk" %(att, ))
os.unlink(att)
print("Done!")
main()