Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process lamp files, initial #92

Merged
merged 12 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: test

on:
pull_request:
push:
branches:
- main

jobs:
backend:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Setup Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
poetry install
- name: test code with pytest
run: |
poetry run python -m pytest
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.env.local
**/__pycache__
.python-version
.python-version
.temp
2 changes: 1 addition & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"recommendations": ["ms-python.black-formatter"]
"recommendations": ["ms-python.black-formatter", "ms-python.flake8"]
}
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ So far we have:
- Store MBTA Alerts data daily.
- Store number of trips with new trains on Orange and Red line daily.
- Store Bluebikes station status data every 5 min.
- Store ridership data
- Process and store speed restrictions

To add a new lambda function, put the methods you need in a new file in chalicelib/.
Then add your trigger in app.py.
Expand Down
2 changes: 2 additions & 0 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ echo "Deploying version $GIT_VERSION | $GIT_SHA"

# Adding some datadog tags to get better data
DD_TAGS="git.commit.sha:$GIT_SHA,git.repository_url:github.com/transitmatters/data-ingestion"
DD_GIT_REPOSITORY_URL="github.com/transitmatters/data-ingestion"
DD_GIT_COMMIT_SHA="$GIT_SHA"

poetry export -f requirements.txt --output ingestor/requirements.txt --without-hashes

Expand Down
7 changes: 7 additions & 0 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
predictions,
landing,
trip_metrics,
lamp,
yankee,
)

Expand Down Expand Up @@ -161,3 +162,9 @@ def store_landing_data(event):
@app.schedule(Cron("0/5", "0-6,9-23", "*", "*", "?", "*"))
def update_yankee_shuttles(event):
yankee.update_shuttles()


# Runs every 60 minutes from either 4 AM -> 1:55AM or 5 AM -> 2:55 AM depending on DST
@app.schedule(Cron("0", "0-6,9-23", "*", "*", "?", "*"))
def process_daily_lamp(event):
lamp.ingest_lamp_data()
Empty file.
3 changes: 3 additions & 0 deletions ingestor/chalicelib/lamp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ["ingest_lamp_data"]

from .ingest import ingest_lamp_data
Empty file.
Empty file.
150 changes: 150 additions & 0 deletions ingestor/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from datetime import date
import io
from typing import Tuple
import requests
import pandas as pd

from .utils import format_dateint, get_current_service_date
from ..parallel import make_parallel
from ..s3 import upload_df_as_csv


LAMP_INDEX_URL = "https://performancedata.mbta.com/lamp/subway-on-time-performance-v1/index.csv"
RAPID_DAILY_URL_TEMPLATE = "https://performancedata.mbta.com/lamp/subway-on-time-performance-v1/{YYYY_MM_DD}-subway-on-time-performance-v1.parquet"
S3_BUCKET = "tm-mbta-performance"
# month and day are not zero-padded
S3_KEY_TEMPLATE = "Events-lamp/daily-data/{stop_id}/Year={YYYY}/Month={_M}/Day={_D}/events.csv"


# LAMP columns to fetch from parquet files
INPUT_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"stop_id",
"direction_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"move_timestamp", # departure time from the previous station
"stop_timestamp", # arrival time at the current station
]

# columns that should be output to s3 events.csv
OUTPUT_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"direction_id",
"stop_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"event_type",
"event_time",
]


def _local_save(s3_key, stop_events):
"""TODO remove this temp code, it saves the output files locally!"""
import os

s3_key = ".temp/" + s3_key
if not os.path.exists(os.path.dirname(s3_key)):
os.makedirs(os.path.dirname(s3_key))
stop_events.to_csv(s3_key)


def _process_arrival_departure_times(pq_df: pd.DataFrame) -> pd.DataFrame:
"""Process and collate arrivals and departures for a timetable of events.

Before: TODO add example
After: TODO add example
"""
# NB: While generally, we can trust df dtypes fetched from parquet files as the files are compressed with columnar metadata,
# theres some numerical imprecisions that numpy seem to be throwing on M1 machines
# that are affecting how epoch timestamps are being cased to datetimes. Maybe not a problem on the AWS machines, though?
pq_df["dep_time"] = pd.to_datetime(pq_df["move_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")
pq_df["arr_time"] = pd.to_datetime(pq_df["stop_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")

# explode departure and arrival times
arr_df = pq_df[pq_df["arr_time"].notna()]
arr_df = arr_df.assign(event_type="ARR").rename(columns={"arr_time": "event_time"})
arr_df = arr_df[OUTPUT_COLUMNS]

dep_df = pq_df[pq_df["dep_time"].notna()]
dep_df = dep_df.assign(event_type="DEP").rename(columns={"dep_time": "event_time"}).drop(columns=["arr_time"])

# these departures are from the the previous stop! so set them to the previous stop id
# find the stop id for the departure whose sequence number precences the recorded one
# stop sequences don't necessarily increment by 1 or with a reliable pattern
dep_df = dep_df.sort_values(by=["stop_sequence"])
dep_df = pd.merge_asof(
dep_df,
dep_df,
on=["stop_sequence"],
by=[
"service_date", # comment for faster performance
"route_id",
"trip_id",
"vehicle_id",
"vehicle_label", # comment for faster performance
"direction_id",
"event_type", # comment for faster performance
],
direction="backward",
suffixes=("_curr", "_prev"),
allow_exact_matches=False, # don't want to match on itself
)
# use CURRENT time, but PREVIOUS stop id
dep_df = dep_df.rename(columns={"event_time_curr": "event_time", "stop_id_prev": "stop_id"})[OUTPUT_COLUMNS]

# stitch together arrivals and departures
return pd.concat([arr_df, dep_df])


def fetch_pq_file_from_remote(service_date: date) -> pd.DataFrame:
"""Fetch a parquet file from LAMP for a given service date."""
# TODO(check if file exists in index, throw if it doesn't)
url = RAPID_DAILY_URL_TEMPLATE.format(YYYY_MM_DD=service_date.strftime("%Y-%m-%d"))
result = requests.get(url)
return pd.read_parquet(io.BytesIO(result.content), columns=INPUT_COLUMNS, engine="pyarrow")


def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:
"""Process and tranform columns for the full day's events."""
pq_df["direction_id"] = pq_df["direction_id"].astype("int16")
pq_df["service_date"] = pq_df["service_date"].apply(format_dateint)

processed_daily_events = _process_arrival_departure_times(pq_df)
return processed_daily_events.sort_values(by=["event_time"])


def upload_to_s3(stop_id_and_events: Tuple[str, pd.DataFrame], service_date: date) -> None:
"""Upload events to s3 as a .csv file."""
# unpack from iterable
stop_id, stop_events = stop_id_and_events

# Upload to s3 as csv
s3_key = S3_KEY_TEMPLATE.format(stop_id=stop_id, YYYY=service_date.year, _M=service_date.month, _D=service_date.day)
# _local_save(s3_key, stop_events)
upload_df_as_csv(S3_BUCKET, s3_key, stop_events)
return [stop_id]


_parallel_upload = make_parallel(upload_to_s3)


def ingest_lamp_data():
"""Ingest and upload today's LAMP data."""
service_date = get_current_service_date()
pq_df = fetch_pq_file_from_remote(service_date)
processed_daily_events = ingest_pq_file(pq_df)

# split daily events by stop_id and parallel upload to s3
stop_event_groups = processed_daily_events.groupby("stop_id")
_parallel_upload(stop_event_groups, service_date)


if __name__ == "__main__":
ingest_lamp_data()
Empty file.
29 changes: 29 additions & 0 deletions ingestor/chalicelib/lamp/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import date, datetime
from ..utils import EASTERN_TIME, service_date


def test_service_date():
assert service_date(datetime(2023, 12, 15, 3, 0, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 5, 45, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 7, 15, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 23, 59, 59)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 0, 0, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 2, 59, 59)) == date(2023, 12, 15)


def test_localized_datetime():
assert service_date(datetime(2023, 12, 15, 3, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 5, 45, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 7, 15, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 23, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 0, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 2, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)


def test_edt_vs_est_datetimes():
assert service_date(datetime(2023, 11, 5, 23, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 0, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 1, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 2, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
# 3am EST is 4am EDT
assert service_date(datetime(2023, 11, 6, 3, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 6)
25 changes: 25 additions & 0 deletions ingestor/chalicelib/lamp/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo

EASTERN_TIME = ZoneInfo("US/Eastern")


def service_date(ts: datetime) -> date:
# In practice a None TZ is UTC, but we want to be explicit
# In many places we have an implied eastern
ts = ts.replace(tzinfo=EASTERN_TIME)

if ts.hour >= 3 and ts.hour <= 23:
return date(ts.year, ts.month, ts.day)

prior = ts - timedelta(days=1)
return date(prior.year, prior.month, prior.day)


def get_current_service_date() -> date:
return service_date(datetime.now(EASTERN_TIME))


def format_dateint(dtint: int) -> str:
"""Safely takes a dateint of YYYYMMDD to YYYY-MM-DD."""
return datetime.strptime(str(dtint), "%Y%m%d").strftime("%Y-%m-%d")
Loading