Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Trip Metrics #122

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions ingestor/chalicelib/bluebikes.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ def calc_daily_stats(day):

# create fields to determine rideability
df["pct_full"] = df["num_bikes_available"] / (df["num_docks_available"] + df["num_bikes_available"])
df["rideable"] = np.where(
(df["pct_full"] >= 0.1) & (df["pct_full"] <= 0.85) & (df["station_status"] == "active"), 1, 0
)

# station_status seems to be missing from recent data, is_renting is present and may be the equivalent
if "station_status" in df.columns:
df["rideable"] = np.where(
(df["pct_full"] >= 0.1) & (df["pct_full"] <= 0.85) & (df["station_status"] == "active"), 1, 0
)
elif "is_renting" in df.columns:
df["rideable"] = np.where((df["pct_full"] >= 0.1) & (df["pct_full"] <= 0.85) & (df["is_renting"] == 1), 1, 0)

# create data frames on rideability for station and neighbors
df_sm = df[["station_id", "pct_full", "rideable", "datetimepulled"]]
Expand Down
Empty file.
78 changes: 78 additions & 0 deletions ingestor/chalicelib/speeds/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import date, timedelta
from decimal import Decimal
import json
import sys
import pandas as pd

from .segment_speed import StopPair, format_tt_df, get_aggregate_data_dates, pair_from_fullpair, get_stop_pairs

from ..parallel import make_parallel
from .. import dynamo
from .constants import COLORS

"""
To run when backfilling long periods of data

poetry run python -m ingestor.chalicelib.speeds.backfill [first_date]

Example:
poetry run python -m ingestor.chalicelib.speeds.backfill 2025-01-01

It will run until TODAY

If running from 2016, will take close to 1 hour
"""


def get_date_chunks(start: str, end: str, delta: int):
interval = (end - start).days + 1
cur = start
while interval != 0:
inc = min(interval, delta)
yield (cur, cur + timedelta(days=inc - 1))
interval -= inc
cur += timedelta(days=inc)


def _gather_tts(date_range, pair: StopPair):
(start, stop) = date_range
return get_aggregate_data_dates(pair, start, stop, verbose=True, raw=True)


gather_tts = make_parallel(
_gather_tts, THREAD_COUNT=10
) # not as ambitious as it sounds, since we're mostly just waiting for the server.


def backfill_daily_median_travel_time(start: date, end: date):
pairs = [(color, pair) for color in COLORS for pair in get_stop_pairs(color)]

row_dicts = pd.concat(
[
format_tt_df(
pd.DataFrame.from_records(gather_tts(get_date_chunks(start, end, delta=240), pair_from_fullpair(o[1]))),
o[0],
pair_from_fullpair(o[1]),
)
for o in pairs
]
).to_dict(orient="records")

unique_dict = {(item["date_stop_pair"], item["route"]): item for item in row_dicts}
row_dicts = list(unique_dict.values())

dynamo.dynamo_batch_write(json.loads(json.dumps(row_dicts), parse_float=Decimal), "SegmentTravelTimes")


def main(args=sys.argv):
if len(args) > 1:
start = date.fromisoformat(args[1])
else:
start = date(2016, 1, 15)

end = date.today()
backfill_daily_median_travel_time(start, end)


if __name__ == "__main__":
main()
Loading