forked from Ludo6431/sleepbot2ical
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsleep_as_android_txt_clean.py
148 lines (111 loc) · 4.56 KB
/
sleep_as_android_txt_clean.py
1
# -*- coding: utf-8 -*-# cleans sleep CSV FILES#Id,Tz,From,To,Sched,Hours,Rating,Comment,Framerate,Snore,Noise,Cycles,Eventfrom datetime import datetimefrom datetime import timedeltaimport urllib2import shutilimport urlparseimport osdef readSB(f): sleeps = list() keys = f.readline() keys = keys.split(',') #set to make sure we don't repeat stuff unique_sleep_ids = set() line_num = -1 # for debugging while True: line_num += 1 line = f.readline() if not line: break #data rows alwys start with a timestamp such as 1406218286546 if line.startswith('"1'): values = line.split(',') d = dict(zip(keys, values)) new_d = dict() ## Data prep.... # processing certain keys d["Hours"] = d["Hours"].strip('"') d["Comment"] = d["Comment"].strip('"').strip('#home').strip('#newmoon').strip('manually added')\ .strip("Nap length") d["Rating"] = d["Rating"].strip('"') d["StartDateTime"] = datetime.strptime(d['From'], '"%d. %m. %Y %H:%M"') d["EndDateTime"] = datetime.strptime(d['To'], '"%d. %m. %Y %H:%M"') # wrap beginning date d["startDateWrapped"], d["startHourWrapped"] = wrapDateTime(d["StartDateTime"]) d["endDateWrapped"], d["endHourWrapped"] = wrapDateTime(d["EndDateTime"]) #debug: double check ## print d["Hours"], d["startHourWrapped"] - d["endHourWrapped"] #make sleep types # ignore napping and all of that if ("#nap" in d["Comment"]): d["type"] = "nap" elif ("#failedToSleep" in d["Comment"]): d["type"] = "failedtosleep" else: d["type"] = "normal" # only keep certain keys for k in ["Id", "Tz", "StartDateTime", "EndDateTime", "startDateWrapped", "startHourWrapped", "endDateWrapped", "endHourWrapped", "Comment", "Hours", "type"]: new_d[k] = d[k] # ignore duplicates if (d["Id"] in unique_sleep_ids): print "duplicate sleepcloud entry: " + d["Id"] else: sleeps.append(new_d) unique_sleep_ids.add(d["Id"]) return sleeps'''given datetime object, and cutoff hour will return'''def wrapDateTime(dt, dayEndHour = 20): if dt.hour < dayEndHour: return dt.date(), dt.hour + (dt.minute / 60.0) else: return dt.date() + timedelta(days=1), (dt.hour + (dt.minute / 60.0) - 24)def analyze(sleeps): # make a map from day to sleeps startDate_to_sleeps_map = dict() for sleep in sleeps: startDate_to_sleeps_map.setdefault(sleep["startDateWrapped"], []).append(sleep) # rank and sort each guy new_startDate_to_sleeps_map = dict() sorted_sleeps_by_day = [] #result container allSleepsFlattened = [] for day, sleeps in startDate_to_sleeps_map.items(): sorted_sleeps_by_day = sorted(sleeps, key= lambda k:k["startHourWrapped"]) #todo do this with enums totalSleepForTheDay = {"nap": 0, "failedtosleep": 0, "normal": 0} indexForSleepType = {"nap": 0, "failedtosleep": 0, "normal": 0} for idx, sleep in enumerate(sorted_sleeps_by_day): try: totalSleepForTheDay[sleep["type"]] += float(sleep["Hours"]) except: print sleep["Hours"] # generate indices sleep["indexOverAllTypes"] = idx sleep["indexForGivenType"] = indexForSleepType[sleep["type"]] indexForSleepType[sleep["type"]] += 1 # need a seperate loop for total hours per day for sleep in sorted_sleeps_by_day: for sleeptype in ["nap", "failedtosleep", "normal"]: sleep["totalHoursForDay_" + sleeptype] = totalSleepForTheDay[sleeptype] sleep["numberOfSegmentsForDayForGivenType"] = indexForSleepType[sleep["type"]] # remember, i just incremented the index allSleepsFlattened.extend(sorted_sleeps_by_day) # write to file import csv keys = allSleepsFlattened[0].keys() with open('/Users/steven/Dropbox/1_current/quantified_self/wellness_dashboard/data/sleep_analyzed.csv','wb') as f: dict_writer = csv.DictWriter(f, keys) dict_writer.writeheader() dict_writer.writerows(allSleepsFlattened)def test(): csvfile = open("/Users/steven/Dropbox/1_current/quantified_self/wellness_dashboard/data/sleep_as_android_autocopied_from_dropbox.txt", 'rb') analyze(readSB(csvfile))if __name__ == "__main__": test() #main()# todo add a main