Skip to content

Commit

Permalink
Replace calls to performance API with LAMP data
Browse files Browse the repository at this point in the history
  • Loading branch information
devinmatte committed Apr 8, 2024
1 parent d554de7 commit f0044cc
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 169 deletions.
10 changes: 6 additions & 4 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"recommendations": [
"dbaeumer.vscode-eslint",
"esbenp.prettier-vscode",
]
"recommendations": [
"dbaeumer.vscode-eslint",
"esbenp.prettier-vscode",
"ms-python.black-formatter",
"ms-python.flake8"
]
}
2 changes: 1 addition & 1 deletion server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
app.register_middleware(ConvertToMiddleware(datadog_lambda_wrapper))


def parse_user_date(user_date):
def parse_user_date(user_date: str):
date_split = user_date.split("-")
[year, month, day] = [int(x) for x in date_split[0:3]]
return date(year=year, month=month, day=day)
Expand Down
8 changes: 4 additions & 4 deletions server/chalicelib/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def faster_describe(grouped):
# `travel_times_over_time` is legacy and returns just the by_date aggregation w/ peak == all


def aggregate_traveltime_data(start_date: str | datetime.date, end_date: str | datetime.date, from_stops, to_stops):
def aggregate_traveltime_data(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
all_data = data_funcs.travel_times(start_date, from_stops, to_stops, end_date)
if not all_data:
return None
Expand Down Expand Up @@ -99,7 +99,7 @@ def calc_travel_times_by_date(df):
return summary_stats_final


def travel_times_all(start_date: str | datetime.date, end_date: str, from_stops, to_stops):
def travel_times_all(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops)
if df is None:
return {"by_date": [], "by_time": []}
Expand All @@ -112,7 +112,7 @@ def travel_times_all(start_date: str | datetime.date, end_date: str, from_stops,
}


def travel_times_over_time(start_date: str | datetime.date, end_date: str | datetime.date, from_stops, to_stops):
def travel_times_over_time(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops)
if df is None:
return []
Expand All @@ -123,7 +123,7 @@ def travel_times_over_time(start_date: str | datetime.date, end_date: str | date
####################
# HEADWAYS
####################
def headways_over_time(start_date: str | datetime.date, end_date: str | datetime.date, stops):
def headways_over_time(start_date: datetime.date, end_date: datetime.date, stops):
all_data = data_funcs.headways(start_date, stops, end_date)
if not all_data:
return []
Expand Down
149 changes: 12 additions & 137 deletions server/chalicelib/data_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,46 +57,11 @@ def use_S3(date, bus=False):
return archival or bus


def partition_S3_dates(start_date: str | date, end_date: str | date, bus=False):
"""
Partitions dates by what data source they should be fetched from.
S3 is used for archival data and for bus data. API is used for recent (within 90 days) subway data.
TODO: Add Gobble data to this partitioning.
"""
CUTOFF = datetime.date.today() - datetime.timedelta(days=90)

s3_dates = None
api_dates = None

if end_date < CUTOFF or bus:
s3_dates = (start_date, end_date)
elif CUTOFF <= start_date:
api_dates = (start_date, end_date)
else:
s3_dates = (start_date, CUTOFF - datetime.timedelta(days=1))
api_dates = (CUTOFF, end_date)

return (s3_dates, api_dates)


def headways(start_date: str | date, stops, end_date: str | date | None = None):
def headways(start_date: date, stops, end_date: date | None = None):
if end_date is None:
if use_S3(start_date, is_bus(stops)):
return s3_historical.headways(stops, start_date, start_date)
else:
return process_mbta_headways(stops, start_date)

s3_interval, api_interval = partition_S3_dates(start_date, end_date, is_bus(stops))
all_data = []
if s3_interval:
start, end = s3_interval
all_data.extend(s3_historical.headways(stops, start, end))

if api_interval:
start, end = api_interval
all_data.extend(process_mbta_headways(stops, start, end))
return s3_historical.headways(stops, start_date, start_date)

return all_data
return s3_historical.headways(stops, start_date, end_date)


# Transit days run 3:30am-3:30am local time
Expand All @@ -109,110 +74,18 @@ def current_transit_day():
return today


def process_mbta_headways(stops, start_date: str | date, end_date: str | date | None = None):
# get data
api_data = MbtaPerformanceAPI.get_api_data("headways", {"stop": stops}, start_date, end_date)
# combine all headways data
headways = []
for dict_data in api_data:
headways += dict_data.get("headways", [])

# conversion
for headway_dict in headways:
# convert to datetime
headway_dict["current_dep_dt"] = stamp_to_dt(headway_dict.get("current_dep_dt"))
headway_dict["previous_dep_dt"] = stamp_to_dt(headway_dict.get("previous_dep_dt"))
# convert to int
headway_dict["benchmark_headway_time_sec"] = int(headway_dict.get("benchmark_headway_time_sec"))
headway_dict["headway_time_sec"] = int(headway_dict.get("headway_time_sec"))
headway_dict["direction"] = int(headway_dict.get("direction"))

return sorted(headways, key=lambda x: x["current_dep_dt"])


def travel_times(start_date, from_stops, to_stops, end_date: str | date | None = None):
def travel_times(start_date: date, from_stops, to_stops, end_date: date | None = None):
if end_date is None:
if use_S3(start_date, is_bus(from_stops)):
return s3_historical.travel_times(from_stops, to_stops, start_date, start_date)
else:
return process_mbta_travel_times(from_stops, to_stops, start_date)

s3_interval, api_interval = partition_S3_dates(start_date, end_date, is_bus(from_stops))
all_data = []
if s3_interval:
start, end = s3_interval
all_data.extend(s3_historical.travel_times(from_stops, to_stops, start, end))

if api_interval:
start, end = api_interval
all_data.extend(process_mbta_travel_times(from_stops, to_stops, start, end))
return all_data


def process_mbta_travel_times(from_stops, to_stops, start_date: str | date, end_date: str | date | None = None):
# get data
api_data = MbtaPerformanceAPI.get_api_data(
"traveltimes", {"from_stop": from_stops, "to_stop": to_stops}, start_date, end_date
)
# combine all travel times data, remove threshold flags from performance API, and dedupe on `dep_dt`
trips = {}
for dict_data in api_data:
for event in dict_data.get("travel_times", []):
dep_dt = event["dep_dt"]
if dep_dt not in trips:
trips[dep_dt] = {
"route_id": event["route_id"],
"direction": int(event["direction"]),
# convert to datetime
"dep_dt": stamp_to_dt(event["dep_dt"]),
"arr_dt": stamp_to_dt(event["arr_dt"]),
# convert to int
"travel_time_sec": int(event["travel_time_sec"]),
"benchmark_travel_time_sec": int(event["benchmark_travel_time_sec"]),
}
trips_list = list(trips.values())
return sorted(trips_list, key=lambda x: x["dep_dt"])


def dwells(start_date, stops, end_date: str | date | None = None):
if end_date is None:
if use_S3(start_date, is_bus(stops)):
return s3_historical.dwells(stops, start_date, start_date)
else:
return process_mbta_dwells(stops, start_date)
return s3_historical.travel_times(from_stops, to_stops, start_date, start_date)

s3_interval, api_interval = partition_S3_dates(start_date, end_date, is_bus(stops))
all_data = []
if s3_interval:
start, end = s3_interval
all_data.extend(s3_historical.dwells(stops, start, end))
return s3_historical.travel_times(from_stops, to_stops, start_date, end_date)

if api_interval:
start, end = api_interval
all_data.extend(process_mbta_dwells(stops, start, end))

return all_data


def process_mbta_dwells(stops, start_date: str | date, end_date: str | date | None = None):
# get data
api_data = MbtaPerformanceAPI.get_api_data("dwells", {"stop": stops}, start_date, end_date)

# combine all travel times data
dwells = []
for dict_data in api_data:
dwells += dict_data.get("dwell_times", [])

# conversion
for dwell_dict in dwells:
# convert to datetime
dwell_dict["arr_dt"] = stamp_to_dt(dwell_dict.get("arr_dt"))
dwell_dict["dep_dt"] = stamp_to_dt(dwell_dict.get("dep_dt"))
# convert to int
dwell_dict["dwell_time_sec"] = int(dwell_dict.get("dwell_time_sec"))
dwell_dict["direction"] = int(dwell_dict.get("direction"))
def dwells(start_date, stops, end_date: date | None = None):
if end_date is None:
return s3_historical.dwells(stops, start_date, start_date)

return sorted(dwells, key=lambda x: x["arr_dt"])
return s3_historical.dwells(stops, start_date, end_date)


def alerts(day, params):
Expand All @@ -222,8 +95,10 @@ def alerts(day, params):
# yesterday + 1 bonus day to cover the gap, since aws is only populated at 5/6am.
yesterday = today - datetime.timedelta(days=2)

# TODO: Handle either format (v2 or v3) of alerts
# Use the API for today and yesterday's transit day, otherwise us.
if day >= yesterday:
# TODO: Replace v2 calls with v3
api_data = MbtaPerformanceAPI.get_api_data("pastalerts", params, day)
elif day >= WE_HAVE_ALERTS_SINCE:
# This is stupid because we're emulating MBTA-performance ick
Expand Down
7 changes: 7 additions & 0 deletions server/chalicelib/date_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
MAX_MONTH_DATA_DATE = "2024-02-29"


def get_max_monthly_data_date():
"""
Returns the most recent date for which we have monthly data
"""
return datetime.strptime(MAX_MONTH_DATA_DATE, "%Y-%m-%d").date()


def parse_event_date(date_str: str):
if len(date_str) == 19:
return datetime.strptime(date_str, DATE_FORMAT_MASSDOT).replace(tzinfo=EASTERN_TIME)
Expand Down
22 changes: 13 additions & 9 deletions server/chalicelib/parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date
import pandas as pd

from chalicelib.date_utils import MAX_MONTH_DATA_DATE
from chalicelib import date_utils


def make_parallel(single_func, THREAD_COUNT=5):
Expand All @@ -26,29 +27,32 @@ def date_range(start: str, end: str):
return pd.date_range(start, end)


def s3_date_range(start: str, end: str):
def s3_date_range(start: date, end: date):
"""
Generates a date range, meant for s3 data
For all dates that we have monthly datasets for, return 1 date of the month
For all dates that we have daily datasets for, return all dates
"""
month_end = end
if pd.to_datetime(MAX_MONTH_DATA_DATE) < pd.to_datetime(end):
month_end = MAX_MONTH_DATA_DATE
if date_utils.get_max_monthly_data_date() < end and date_utils.get_max_monthly_data_date() > start:
month_end = date_utils.get_max_monthly_data_date()

date_range = pd.date_range(start, month_end, freq="1D", inclusive="both")

# This is kinda funky, but is stil simpler than other approaches
# pandas won't generate a monthly date_range that includes Jan and Feb for Jan31-Feb1 e.g.
# So we generate a daily date_range and then resample it down (summing 0s as a no-op in the process) so it aligns.
dates = pd.date_range(start, month_end, freq="1D", inclusive="both")
series = pd.Series(0, index=dates)
months = series.resample("1M").sum().index
if date_utils.get_max_monthly_data_date() > start:
dates = pd.date_range(start, month_end, freq="1D", inclusive="both")
series = pd.Series(0, index=dates)
date_range = series.resample("1M").sum().index

# all dates between month_end and end if month_end is less than end
if pd.to_datetime(month_end) < pd.to_datetime(end):
daily_dates = pd.date_range(month_end, end, freq="1D", inclusive="both")

# combine the two date ranges of months and dates
if daily_dates is not None and len(daily_dates) > 0:
months = months.union(daily_dates)
date_range = date_range.union(daily_dates)

return months
return date_range
33 changes: 22 additions & 11 deletions server/chalicelib/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import date
import boto3
import botocore
import pandas as pd
from botocore.exceptions import ClientError
import csv
import itertools
import zlib

from chalicelib import parallel
from chalicelib import date_utils

BUCKET = "tm-mbta-performance"
s3 = boto3.client("s3", config=botocore.client.Config(max_pool_connections=15))
Expand All @@ -33,34 +35,44 @@ def is_bus(stop_id: str):
return ("-0-" in stop_id) or ("-1-" in stop_id)


def get_live_folder(stop_id: str):
def get_gobble_folder(stop_id: str):
if is_bus(stop_id):
return "daily-bus-data"
else:
return "daily-rapid-data"


def download_one_event_file(date, stop_id: str, use_live_data=False):
def get_lamp_folder():
return "daily-data"


def download_one_event_file(date: pd.Timestamp, stop_id: str, use_gobble=False):
"""As advertised: single event file from s3"""
year, month, day = date.year, date.month, date.day

if use_live_data:
folder = get_live_folder(stop_id)
key = f"Events-live/{folder}/{stop_id}/Year={year}/Month={month}/Day={day}/events.csv.gz"
# if current date is newer than the max monthly data date, use LAMP
if date.date() > date_utils.get_max_monthly_data_date():
# if we've asked to use gobble data or bus data, check gobble
if use_gobble or is_bus(stop_id):
folder = get_gobble_folder(stop_id)
key = f"Events-live/{folder}/{stop_id}/Year={year}/Month={month}/Day={day}/events.csv.gz"
else:
folder = get_lamp_folder()
key = f"Events-lamp/{folder}/{stop_id}/Year={year}/Month={month}/Day={day}/events.csv"
else:
folder = "monthly-bus-data" if is_bus(stop_id) else "monthly-data"
key = f"Events/{folder}/{stop_id}/Year={year}/Month={month}/events.csv.gz"

# Download events from S3
try:
decompressed = download(key, "ascii")
decompressed = download(key, "ascii", ".gz" in key)

except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
# raise Exception(f"Data not available on S3 for key {key} ") from None
print(f"WARNING: No data available on S3 for key: {key}")
if not use_live_data and is_bus(stop_id):
return download_one_event_file(date, stop_id, use_live_data=True)
if not use_gobble and not is_bus(stop_id):
return download_one_event_file(date, stop_id, use_gobble=True)
return []
else:
raise
Expand All @@ -76,13 +88,12 @@ def download_one_event_file(date, stop_id: str, use_live_data=False):


@parallel.make_parallel
def parallel_download_events(datestop):
def parallel_download_events(datestop: itertools.product):
(date, stop) = datestop
# TODO: Force gobble when date is past the max monthly data date
return download_one_event_file(date, stop)


def download_events(start_date: str | date, end_date: str | date, stops: list):
def download_events(start_date: date, end_date: date, stops: list):
datestops = itertools.product(parallel.s3_date_range(start_date, end_date), stops)
result = parallel_download_events(datestops)
result = filter(
Expand Down
1 change: 1 addition & 0 deletions server/chalicelib/s3_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def key(day):

def get_alerts(day, routes):
alerts_str = s3.download(key(day), "utf8")
# TODO: Handle either format (v2 or v3) of alerts
alerts = json.loads(alerts_str)[0]["past_alerts"]

def matches_route(alert):
Expand Down
Loading

0 comments on commit f0044cc

Please sign in to comment.