diff --git a/README.md b/README.md index 418e975b5..0381119be 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ For those curious about what data we manage is released on OpenData, see the [Op - [Bluetooth Detectors](#bluetooth-detectors) - [Collisions](#collisions) - [Cycling App (inactive)](#cycling-app-inactive) -- [Events (inactive)](#events-inactive) +- [Events](#events) - [GIS - Geographic Data](#gis---geographic-data) - [Assets](#assets) - [Red Light Cameras](#red-light-cameras) @@ -26,13 +26,12 @@ For those curious about what data we manage is released on OpenData, see the [Op - [Incidents (inactive)](#incidents-inactive) - [INRIX (inactive)](#inrix-inactive) - [Parking (inactive)](#parking-inactive) -- [Road Closure (inactive)](#road-closure-inactive) - [TTC (inactive)](#ttc-inactive) - [Volume Data](#volume-data) - [Miovision - Multi-modal Permanent Video Counters](#miovision---multi-modal-permanent-video-counters) - [RESCU - Loop Detectors (inactive)](#rescu---loop-detectors-inactive) - - [Short-term Counting Program](#short-term-counting-program) - - [VDS](#vehicle-detector-station-vds) + - [Short-term Counting Program](#short-term-counting-program) + - [Vehicle Detector Station (VDS)](#vehicle-detector-station-vds) - [Watch Your Speed signs](#watch-your-speed-signs) - [Weather](#weather) - [Open Data Releases](#open-data-releases) @@ -60,11 +59,13 @@ The collisions dataset consists of data on individuals involved in traffic colli The Cycling App collected OD and trip data until 2016. -## Events (inactive) +## Events [`events/`](events/) -How do special events impact traffic in the city? Data sources include the City's Open Data and TicketMaster. +How does construction and special events impact traffic in the city? +- City road permitting data (RoDARs) +- (oudated) Special events from City's Open Data and TicketMaster ## GIS - Geographic Data @@ -117,11 +118,6 @@ Data collected from a variety of traffic probes from 2007 to 2016 for major stre This contains R and SQL files for pulling parking lots and parking tickets from Open Data. They might be useful but haven't been documented or automated. -## Road Closure (inactive) -[`road_closure/`](road_closure/) - -This directory contains a Python file to pull and parse the XML feed of road closures. This process hasn't been automated (and more recent versions of the API use JSON). - ## TTC (inactive) [`ttc/`](ttc/) diff --git a/dags/readme.md b/dags/readme.md index cdfe45b87..e1286ad8e 100644 --- a/dags/readme.md +++ b/dags/readme.md @@ -1,6 +1,7 @@ - [Airflow](#airflow) - [DAG Naming](#dag-naming) - [Contents of DAGs Folder](#contents-of-dags-folder) + - [Bluetooth](#bluetooth) - [Weather](#weather) - [Misc](#misc) - [Here](#here) @@ -12,6 +13,7 @@ - [**common\_tasks.py**](#common_taskspy) - [**dag\_functions.py**](#dag_functionspy) - [Miovision](#miovision) + - [RoDARS](#rodars) - [VDS (Formerly RESCU)](#vds-formerly-rescu) - [WYS](#wys) - [Common Tasks](#common-tasks) @@ -33,6 +35,9 @@ You may notice that many older DAGs have not been renamed to this standard: it i ## Contents of DAGs Folder **Only put DAGs for data intake in this folder,** DAGs for data processing related to projects should be in their respective project repositories. +### Bluetooth +- [bluetooth_check_readers_temp.py](bluetooth_check_readers_temp.py): undocumented. + ### Weather - **[weather_pull.py](weather_pull.py)**: [readme](../weather/README.md#data-pipeline---weather_pull-dag). - Deprecated: [pull_weather.py](pull_weather.py) @@ -44,7 +49,8 @@ You may notice that many older DAGs have not been renamed to this standard: it i ### Here - **[pull_here.py](pull_here.py)**: [readme](../here/traffic/README.md#probe_path). - **[pull_here_path.py](pull_here_path.py)**: [readme](../here/traffic/README.md#path). -- HERE Aggregations: [citywide_tti_aggregate.py](citywide_tti_aggregate.py). +- HERE Aggregations: [tti_aggregate.py](tti_aggregate.py). +- Deprecated: [citywide_tti_aggregate.py](citywide_tti_aggregate.py) ### Replication - [**replicators.py**](replicators.py): creates collisions and counts replicator DAGs as part of the MOVE -> bigdata replication process. @@ -57,7 +63,7 @@ You may notice that many older DAGs have not been renamed to this standard: it i ### GIS - [**assets_pull.py**](assets_pull.py): [readme](../gis/assets/README.md#assets). -- [**gcc_layers_pull.py**](gcc_layers_pull.py): [readme](../gis/gccview/README.md#gccview-pipeline). +- [**gcc_layers_pull.py**](gcc_layers_pull.py): [readme](../gis/gccview/README.md#gccview-pipeline). Runs on Morbius, Bancroft. - [**vz_google_sheets.py**](vz_google_sheets.py): [readme](../gis/school_safety_zones/README.md#2-the-automated-data-pipeline). - Deprecated: [**pull_interventions_dag.py**](pull_interventions_dag.py). @@ -90,16 +96,20 @@ Contains helper functions to be used in multiple DAGs. - **[miovision_check.py](miovision_check.py)**: [readme](../volumes/miovision/api/readme.md#miovision_check). - Deprecated: [pull_miovision.py](pull_miovision.py), [check_miovision.py](check_miovision.py). +### RoDARS +- **[rodars_pull.py](rodars_pull.py)**: [readme](../events/road_permits/readme.md#rodars-dag). Runs on Morbius. + ### VDS (Formerly RESCU) -- **[vds_check.py](vds_check.py)**: [readme](../volumes/vds/readme.md#vds_check-dag). -- **[vds_pull_vdsdata.py](vds_pull_vdsdata.py)**: [readme](../volumes/vds/readme.md#vds_pull_vdsdata-dag). -- **[vds_pull_vdsvehicledata.py](vds_pull_vdsvehicledata.py)**: [readme](../volumes/vds/readme.md#vds_pull_vdsvehicledata-dag). +- **[vds_check.py](vds_check.py)**: [readme](../volumes/vds/readme.md#vds_check-dag). Runs on Morbius. +- **[vds_pull_vdsdata.py](vds_pull_vdsdata.py)**: [readme](../volumes/vds/readme.md#vds_pull_vdsdata-dag). Runs on Morbius. +- **[vds_pull_vdsvehicledata.py](vds_pull_vdsvehicledata.py)**: [readme](../volumes/vds/readme.md#vds_pull_vdsvehicledata-dag). Runs on Morbius. - Deprecated: [check_rescu.py](check_rescu.py). ### WYS -- [**refresh_wys_monthly.py**](refresh_wys_monthly.py): [readme](../wys/api/readme.md#wys_monthly_summary). -- [**pull_wys.py**](pull_wys.py): [readme](../wys/api/readme.md#pull_wys). +- [**wys_pull.py**](wys_pull.py): [readme](../wys/api/README.md#dag). +- [**refresh_wys_monthly.py**](refresh_wys_monthly.py): [readme](../wys/api/README.md#wys_monthly_summary). - [**wys_check.py**](wys_check.py): Contains additional data quality checks for `pull_wys`. +- Deprecated [**pull_wys.py**](pull_wys.py). ## Common Tasks diff --git a/dags/rodars_pull.py b/dags/rodars_pull.py new file mode 100644 index 000000000..2875c9860 --- /dev/null +++ b/dags/rodars_pull.py @@ -0,0 +1,68 @@ +import os +import sys +from functools import partial +from datetime import datetime, timedelta + +from airflow.decorators import dag, task +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.models import Variable + +DAG_NAME = 'rodars_pull' +DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ['Unknown']) + +repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) +sys.path.insert(0, repo_path) + +from events.road_permits.rodars_functions import ( + fetch_and_insert_issue_data, fetch_and_insert_location_data +) +from dags.dag_functions import task_fail_slack_alert, get_readme_docmd + +README_PATH = os.path.join(repo_path, 'events/road_permits/readme.md') +DOC_MD = get_readme_docmd(README_PATH, DAG_NAME) + +default_args = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past': False, + 'start_date': datetime(2024, 11, 27), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'retry_exponential_backoff': True, #Allow for progressive longer waits between retries + 'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True), + 'catchup': True, +} + +@dag( + dag_id=DAG_NAME, + default_args=default_args, + max_active_runs=1, + template_searchpath=[ + os.path.join(repo_path,'events/road_permits/sql') + ], + doc_md=DOC_MD, + tags=['bdit_data-sources', 'rodars', 'pull', 'itsc_central'], + schedule='0 4 * * *' #daily at 4am +) + +def rodars_dag(): + @task + def pull_rodars_issues(ds = None): + "Get RODARS data from ITSC and insert into bigdata `congestion_events.itsc_issues`" + itsc_bot = PostgresHook('itsc_postgres') + events_bot = PostgresHook('events_bot') + fetch_and_insert_issue_data(select_conn=itsc_bot, insert_conn=events_bot, start_date=ds) + + @task + def pull_rodar_locations(ds = None): + "Get RODARS data from ITSC and insert into bigdata `congestion_events.itsc_issue_locations`" + itsc_bot = PostgresHook('itsc_postgres') + events_bot = PostgresHook('events_bot') + fetch_and_insert_location_data(select_conn=itsc_bot, insert_conn=events_bot, start_date=ds) + #add a delete task to remove outdated revisions? + + #these tasks are not dependent, but this helps so only one fails at a time + pull_rodars_issues() >> pull_rodar_locations() + +rodars_dag() diff --git a/events/readme.md b/events/readme.md new file mode 100644 index 000000000..918653004 --- /dev/null +++ b/events/readme.md @@ -0,0 +1,10 @@ +# Introduction + +This folder contains information on data sources related to road closures and special events. + +## [road_permits](./road_permits/) +This folder contains [Road Disruption Activity Reporting System (RoDARS)](https://www.toronto.ca/services-payments/streets-parking-transportation/road-restrictions-closures/road-disruption-activity-reporting-system-rodars/) which is pulled daily from ITS Central by the [`rodars_pull` DAG](../dags/rodars_pull.py). +The describes permitted road construction and event related road closures with data dating back to ~2012, with more accurate data starting in mid-2024 (RoDARS "New"). + +## [special_events](./special_events/) +This folder contains an outdated (2017) attempt at archiving special events happening around the City of Toronto for traffic impact analysis from two data sources: City of Toronto Open Data and Ticketmaster. \ No newline at end of file diff --git a/events/road_permits/readme.md b/events/road_permits/readme.md new file mode 100644 index 000000000..b2a91246f --- /dev/null +++ b/events/road_permits/readme.md @@ -0,0 +1,237 @@ +- [Introduction](#introduction) + - [RODARS vs RODARS New ("rodars\_new\_approved")](#rodars-vs-rodars-new-rodars_new_approved) + - [What's included?](#whats-included) +- [Querying RODARs Data](#querying-rodars-data) + - [Data Dictionary](#data-dictionary) + - [`congestion_events.rodars_locations`](#congestion_eventsrodars_locations) + - [`rodars_issue_locations.lanesaffectedpattern`](#rodars_issue_locationslanesaffectedpattern) +- [Data Ops](#data-ops) + - [RODARS DAG](#rodars-dag) + - [`lanesaffected`](#lanesaffected) + + +# Introduction + +> [!IMPORTANT] +> The city website gives a good overview of RoDARS (here/below): [Road Disruption Activity Reporting System (RoDARS)](https://www.toronto.ca/services-payments/streets-parking-transportation/road-restrictions-closures/road-disruption-activity-reporting-system-rodars/) + +> RoDARS is a system that informs the public of planned roadway closures throughout the City. The submission procedure follows the acquisition of an approved [Street Occupation Permit](https://www.toronto.ca/?page_id=80501) (construction) or [Street Closure Permit](https://www.toronto.ca/?page_id=84975) (event). +> +> When occupying any portion of the City’s public right of way that is not an expressway, the applicant must submit a [**RoDARS Notification Form**](https://www.toronto.ca/wp-content/uploads/2019/03/8de1-TS_Fillable-RoDARS-Form.pdf) to TMC Dispatch at least two business days before the start of occupation. +> The RoDARS Notification Form must be approved by the appropriate Work Zone Traffic Coordinator (WZTC) before being submitted to TMC Dispatch. +> +> When occupying any portion of a City expressway (F.G.G., DVP or Allen Rd between Eglinton Ave W and Transit Rd), the applicant must submit a RoDARS Notification Form to TMC Dispatch at least seven business days before the start of occupation. The RoDARS Notification Form must be approved by the appropriate City project manager/engineer before submittal to TMC Dispatch. Once attained from TMC Dispatch, TMC’s RESCU Unit will then notify the applicant of the approval verdict. +> +> A separate RoDARS Notification Form is required for each occupied roadway. If the daily schedule varies, separate RoDARS Notification Forms are required for each day. Once the RoDARS form has been submitted and approved, the information then appears on the [Traffic Restrictions Map](https://www.toronto.ca/?page_id=63656). Please refer to the [City Expressway Closure Guidelines](https://www.toronto.ca/wp-content/uploads/2017/11/9184-0_RoDARS-City-Expressway-Closure-Guidelines-a.pdf) for allowable roadway occupancy times. +> +> The applicant must notify the City if either of the following situations arise: +> +> 1. the work schedule and/or work zone plan has been revised or postponed. The applicant must submit a revised and approved RoDARS Notification Form at least one business day before changes occur +> 2. the work has been cancelled or completed early. The applicant must contact TMC Dispatch + +> [!TIP] +> The RoDARS form is public here: https://rodars.transnomis.com/Permit/PermitApplicationCreate/a9180443-b97f-548e-ae1c-fc70cae18a7a?previewMode=Applicants + +Here is a screenshot of the extremely detailed geographic/lane management plan UI (which you can access at the link above): +![Rodars Form](rodars_form.png) + +## RODARS vs RODARS New ("rodars_new_approved") + +> [!IMPORTANT] +> In 2024 a new version of RODARS debuted which should result in a more reliable data source. + +**RODARS (New)** +- RODARs New has only been around since 2024-03 (already has more than 28,000 issues!) +- An online form which contractors fill out directly. Approvals are done by work zone coordinators. +- QR codes will start appearing at sites in 2024/2025, which should help enforceability (citizen reporting/bylaw officers). + - There will be penalties. +- Most records have `centreline_id`! +- Contains detailed description of lane closure pattern (`lanesaffectedpattern`). + +**RODARS (Old)** +- Apparently fax was involved and not all forms were processed = completeness is a concern. +- `centreline_id` was introduced later in the lifespan of original RODARS (Only about 1/3 of those records have a centreline_id, starting from 2021-09). + +Here is a small comparison of the data of the new and old RODARS (differentiated by `divisionid` / `divisionname` as seen below): + +| "divisionid" | "divisionname" | "avg_actual_duration" | "avg_proposed_duration" | "min_starttimestamp" | "max_starttimestamp" | "count" | "has_centreline_id" | "start_centreline" | +|--------------|-----------------------|---------------------------|---------------------------|----------------------|----------------------|---------|---------------------|------------------------------| +| 8014 | "RODARS" | "15 days 28:43:05.992087" | "15 days 09:49:54.340779" | "1930-08-31" | "2024-12-19" | 366100 | 99119 | "2021-09-27 20:55:57.855961" | +| 8048 | "rodars_new_approved" | "20 days 24:26:34.079984" | "18 days 12:11:21.306625" | "2024-03-06" | "2024-12-19" | 28418 | 27837 | "2024-03-06 09:48:30.392945" | + +```sql +SELECT + divisionid, + divisionname, + AVG(actual_duration) AS avg_actual_duration, + AVG(proposed_duration) AS avg_proposed_duration, + MIN(starttimestamp::date) AS min_starttimestamp, + MAX(starttimestamp::date) AS max_starttimestamp, + COUNT(*), + COUNT(*) FILTER (WHERE centreline_id IS NOT NULL) AS has_centreline_id, + MIN(starttimestamp) FILTER (WHERE centreline_id IS NOT NULL) AS start_centreline +FROM congestion_events.rodars_locations +GROUP BY 1, 2 ORDER BY 1, 2; +``` + +## What's included? +- As noted in [the intro](#introduction), both construction and events (eg. parades, marathons) are included. +- Emergency utilities - maybe included. +- Notably, CafeTO is not included (As at EOY 2024). + +# Querying RODARs Data + +> [!WARNING] +> Use of this data is largely untested. If you have an example of use, please help others by proposing a change to this readme! + +You could query construction along specific centreline_ids and during specific dates using this sample query. +```sql +WITH target_geom AS ( + SELECT unnest(links)::integer AS centreline_id + FROM gis_core.get_centreline_btwn_intersections(13466931, 13463747) +) + +--visualizing results in dbeaver is preferable to pgadmin due to better handling of overlapping geoms. +SELECT rl.* +FROM congestion_events.rodars_locations AS rl +JOIN target_geom USING (centreline_id) +WHERE tsrange(starttimestamp, endtimestamp, '[)') @> tsrange('2024-12-01', '2025-01-01', '[)') +``` + +Or you could cast a wider net by using `st_intersects` to also include intersecting cross streets: +```sql +WITH target_geom AS ( + SELECT * FROM gis_core.get_centreline_btwn_intersections(13466931, 13463747) +) + +SELECT rl.* +FROM congestion_events.rodars_locations AS rl, target_geom +WHERE + tsrange(starttimestamp, endtimestamp, '[)') @> tsrange('2024-12-01', '2025-01-01', '[)') + AND st_intersects(rl.centreline_geom, target_geom.geom) +``` + +> [!CAUTION] +> For older time periods you may need to use `issue_geom` instead as `centreline_id` is not consistently populated. + +## Data Dictionary + +### `congestion_events.rodars_locations` +This view joins together issue metadata from `congestion_events.rodars_issues` and location descriptions from `congestion_events.rodars_issue_locations`. + + +| Column | Sample | Description | +|------------------------------|---------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| divisionid | 8048 | 8014 for RODARS, 8048 for "rodars_new_approved" | +| divisionname | rodars_new_approved | Division name | +| issueid | 9535420 | An issueid may have more than one location in this table. | +| sourceid | Tor-RDAR2024-3658 | This ID is helpful for searching on ITS Central | +| description | City of Toronto Contract 24ECS-LU-08SU | This contract ID will yield helpful results on Google. | +| priority | High | Critical/High/Medium/Low/None | +| status | In Progress | Future/In Progress/Ended/Cancelled/Overdue | +| starttimestamp | 10/02/2024 17:43 | | +| endtimestamp | | null if on-going. | +| actual_duration | | endtimestamp - starttimestamp | +| proposedstarttimestamp | 10/01/2024 7:00 | | +| proposedendtimestamp | 10/31/2025 7:00 | | +| proposed_duration | 395 days | proposedendtimestamp - proposedstarttimestamp | +| proposedstart_tod | 7:00:00 | The time of day, extracted from the proposedstarttimestamp (and in conjuction with "recurrence_schedule" field) seems to indicate the working hours in some cases, but is applied inconsistently. | +| proposedend_tod | 7:00:00 | The time of day, extracted from the proposedendtimestamp (and in conjuction with "recurrence_schedule" field) seems to indicate the working hours in some cases, but is applied inconsistently. | +| recurrence_schedule | Weekdays | Continuous/Daily/Weekdays/Weekends/Activity Schedule | +| location_description | Harbord St at James Hales Lane | A high level location | +| mainroadname | Harbord St | | +| fromroadname | James Hales Lane | | +| toroadname | | | +| streetnumber | | | +| locationblocklevel | Full Closure with Emergency Access | This field is the highest level "closure" description and seems to frequently contradict lower level descriptions, as in this case. | +| roadclosuretype_desc | No Closure | Another "closure" description. Contradicts above in this case. | +| locationdescription_toplevel | Harbord St at James Hales Lane | Another high level description | +| direction | West | Each direction on the affected street will have it's own entry. | +| roadname | Harbord St | lowest level description; roadname corresponding to centreline. | +| centreline_id | 14021560 | | +| centreline_geom | 0102000020E61000000200000… | centreline_latest geom, or the most recent geom ordered by version_date DESC in gis_core.centreline. Note this is not always populated, especially for older entries. | +| issue_geom | 0101000020E610000035B56CA… | | +| linear_name_id | 3605 | Also from the centreline. | +| lanesaffectedpattern | TOLOWO | `lanesaffectedpattern` describes the road closures in most detail and seem to be the most accurate. This code is deciphered in the next 9 columns. | +| lap_descriptions | {"Sidewalk Open","Normal Open","Lane Open Traffic Opposite Side"} | The description of each 2 letter code from `lanesaffectedpattern`. | +| lane_open_auto | 2 | These next 8 columns are a numeric description of the road closure from `lanesaffectedpattern`. Any partial closures are coded as 0.5 open/0.5 closed. See notes [here](#lanesaffectedpattern). | +| lane_closed_auto | 0 | | +| lane_open_bike | 0 | | +| lane_closed_bike | 0 | | +| lane_open_ped | 1 | | +| lane_closed_ped | 0 | | +| lane_open_bus | 0 | | +| lane_closed_bus | 0 | | + +#### `rodars_issue_locations.lanesaffectedpattern` + +This column offers the most detailed look at the lane closures. However is has been under development since 2022 and some codes did not enter use until later. For example; Sidewalk closures were not noted until 2023-01-18 and the first bike lane closure on 2023-03-03. They may have not entered regular use until later. + +```sql +--query to identify first occurence of each lanesaffectedpattern code. +SELECT lap.code, lanesaffectedpattern.lane_status, MIN(timestamputc) AS "min(timestamputc)" +FROM congestion_events.rodars_issue_locations, +UNNEST(regexp_split_to_array(lanesaffectedpattern, E'(?=(..)+$)')) AS lap(code) +JOIN itsc_factors.lanesaffectedpattern ON lap.code = lanesaffectedpattern.code +WHERE lanesaffectedpattern <> '' +GROUP BY 1, 2 +ORDER BY 3 +``` + +|code|lane_status |min(timestamputc) | +|---|-----------------------------------------|-----------------------| +LO |Normal Open |2022-05-03 12:16:07.072| +FO |Left Turn Open |2022-05-03 12:16:07.072| +LC |Normal Closed |2022-11-16 23:25:27.637| +SC |Shoulder Closed |2022-12-09 05:24:24.713| +UC |Buffer Closed |2022-12-14 00:00:34.325| +TO |Lane Open Traffic Opposite Side |2022-12-14 00:00:34.325| +TA |Lane Open Traffic Alternating |2023-01-05 14:54:09.856| +RC |Right Turn Closed |2023-01-18 21:00:17.778| +WC |Sidewalk Closed |2023-01-18 21:00:17.778| +FC |Left Turn Closed |2023-01-31 21:00:18.192| +KC |Bicycle Closed |2023-03-03 05:01:08.396| +RO |Right Turn Open |2023-05-23 19:00:12.517| +UO |Buffer Open |2023-07-18 20:00:53.965| +WO |Sidewalk Open |2023-10-30 18:00:10.039| +SO |Shoulder Open |2023-11-04 12:00:14.511| +WP |Sidewalk Closed – Protected Path Provided|2023-11-09 19:23:56.367| +KM |Bicycle Closed – Merge with other traffic|2023-11-24 21:00:43.557| +KD |Bicycle Closed – Detour Path Provided |2023-12-08 00:00:40.657| +WD |Sidewalk Closed – Detour Path Provided |2023-12-08 00:00:40.657| +KP |Bicycle Closed – Protected Path Provided |2023-12-21 00:00:42.532| +KO |Bicycle Open |2024-04-04 19:30:37.703| +HC |HOV Closed |2024-05-10 21:00:13.375| +HO |HOV Open |2024-06-25 20:00:14.220| + +# Data Ops + +- Note there is a data dictionary from the vendor saved here: `K:\tra\GM Office\Big Data Group\Data Sources\RODARS\Municipal 511 Custom Datafeed API Documentation - 2.4 (Toronto).pdf` + - Resources taken from this data dictionary are saved in the `itsc_factors` schema. +- Other codes were deciphered manually using ITS Central. + +## RODARS DAG +`rodars_pull` DAG runs on Morbius in order to access ITS Central database. See code here: [rodars_pull.py](../../dags/rodars_pull.py). + + + +- `pull_rodars_issues`: pulls RODARS issue data from ITSC and inserts into RDS. +- `pull_rodars_locations`: pulls RODARS issue location data from ITSC and inserts into RDS. + + + +## `lanesaffected` + +`lanesaffected` is a loosely formatted json column in the ITSC issuelocationnew table. + +Notes: +- This field is unpacked with `process_lanesaffected` function in [rodars_functions.py](./rodars_functions.py) and converted to tabular format. There are lots of near duplicate records that get unpacked from this column, which prevents any meaningful unique constraints on the `congestion_events.rodars_issue_locations` table. +- Some of the same fields names are used in the top level and the nested json, `LaneApproaches`, eg. `RoadClosureType`. The `_toplevel` suffix is used in `congestion_events.rodars_issue_locations` for the top level fields. + - It is assumed the lower level details are more descriptive when available. +- FeatureId = centreline_id! +- `LanesAffectedPattern` is a code describing lane closures. In `congestion_events.rodars_issue_locations` it is converted to numeric columns: `lane_open_auto, lane_closed_auto, lane_open_bike, lane_closed_bike, lane_open_ped, lane_closed_ped, lane_open_bus, lane_closed_bus` + +Sample: +``` +`lanesaffected`: "{""LocationDescription"":""Huron St from Harbord St to Classic Ave"",""EncodedCoordinates"":""{_oiGpyrcNrDoA"",""LaneApproaches"":[{""Direction"":3,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20},{""Direction"":2,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20}],""LocationBlockLevel"":3,""RoadClosureType"":20}" +``` \ No newline at end of file diff --git a/events/road_permits/rodars_form.png b/events/road_permits/rodars_form.png new file mode 100644 index 000000000..f9a07432d Binary files /dev/null and b/events/road_permits/rodars_form.png differ diff --git a/events/road_permits/rodars_functions.py b/events/road_permits/rodars_functions.py new file mode 100644 index 000000000..b9b406111 --- /dev/null +++ b/events/road_permits/rodars_functions.py @@ -0,0 +1,200 @@ +import os +import logging +import pandas as pd +from numpy import nan +import struct +import json +from io import BytesIO +from psycopg2 import sql, Error +from psycopg2.extras import execute_values + +from airflow.providers.postgres.hooks.postgres import PostgresHook + +SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql') + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +def coordinates_from_binary(br): + 'Read longitude and latitude as doubles (8 bytes each)' + longitude, latitude = struct.unpack('dd', br.read(16)) + return (longitude, latitude) + +def coordinates_to_geomfromtext(l): + 'Formats points and line geoms to be ingested by postgres `st_geomfromtext`' + geom_type = 'POINT' if len(l) == 1 else 'LINESTRING' + coords = ', '.join([f"{x[0]} {x[1]}" for x in l]) + return f"{geom_type}({coords})" + +def geometry_from_bytes(geo_bytes): + 'Initialize a stream to read binary data from the byte array' + coordinates_list = [] + with BytesIO(geo_bytes) as ms: + # Read the first 4 bytes = length + try: + len_val = struct.unpack('i', ms.read(4))[0] + except struct.error: + #struct.error: unpack requires a buffer of 4 bytes + return None + # Iterate and unpack each pair of doubles as coordinates + for _ in range(len_val): + coordinates = coordinates_from_binary(ms) + coordinates_list.append(coordinates) + return coordinates_list + +def process_lanesaffected(json_str): + '''Converts a json variable to pandas dataframe. + + Top level json attributes are given _toplevel suffix, + while contents of LaneApproaches nested json keeps original keys, + with exception of FeatureId (centreline_id) and RoadId (linear_name_id).''' + + if (json_str == 'Unknown') | (json_str is None): + return None + try: + lanesaffected = json.loads(json_str) + except json.decoder.JSONDecodeError as e: + LOGGER.debug("Json str not parsed: %s", json_str) + LOGGER.debug(e) + return None + #expand laneapproach nested json + try: + lanes = pd.json_normalize(lanesaffected, 'LaneApproaches') + except TypeError as e: + LOGGER.debug("Json str not parsed: %s", lanesaffected) + LOGGER.debug(e) + return None + lanes = lanes.rename(columns={ + 'FeatureId': 'centreline_id', + 'RoadId': 'linear_name_id' + }) + #add extra, non-nested variables + keys = list(lanesaffected.keys()) + keys.remove('LaneApproaches') + if keys is not None: + for key in keys: + lanes.insert(0, f"{key}_toplevel", lanesaffected[key]) + return lanes + +def fetch_and_insert_issue_data( + select_conn = PostgresHook('rodars_postgres'), + insert_conn = PostgresHook('events_bot'), + start_date = None +): + '''Fetch, process and insert data from ITS Central issuedata table.''' + select_fpath = os.path.join(SQL_DIR, 'select-rodars_issues.sql') + with open(select_fpath, 'r', encoding="utf-8") as file: + select_query = sql.SQL(file.read()).format( + start = sql.Literal(start_date) + ) + try: + with select_conn.get_conn() as con, con.cursor() as cur: + LOGGER.info("Fetching RODARS data.") + cur.execute(select_query) + data = cur.fetchall() + df = pd.DataFrame(data) + df.columns=[x.name for x in cur.description] + except Error as exc: + LOGGER.critical("Error fetching RODARS data.") + LOGGER.critical(exc) + raise Exception() + + #transform values for inserting + df_final = df.replace({pd.NaT: None, nan: None}) + df_final = [tuple(x) for x in df_final.to_numpy()] + + insert_fpath = os.path.join(SQL_DIR, 'insert-rodars_issues.sql') + with open(insert_fpath, 'r', encoding="utf-8") as file: + insert_query = sql.SQL(file.read()) + + with insert_conn.get_conn() as con, con.cursor() as cur: + execute_values(cur, insert_query, df_final) + +def fetch_and_insert_location_data( + select_conn = PostgresHook('rodars_postgres'), + insert_conn = PostgresHook('events_bot'), + start_date = None +): + '''Fetch, process and insert data from ITS Central issuelocationnew table. + + - Fetches data from ITS Central + - Processes geometry data stored in binary (accounts for both points/lines). + - Unnests mutli layered lanesaffected json column into tabular form. + - Performs some checks on columns unnested from json. + - Inserts into RDS `congestion_events.rodars_issue_locations` table. + ''' + + select_fpath = os.path.join(SQL_DIR, 'select-rodars_issue_locations.sql') + with open(select_fpath, 'r', encoding="utf-8") as file: + select_query = sql.SQL(file.read()).format( + start = sql.Literal(start_date) + ) + try: + with select_conn.get_conn() as con, con.cursor() as cur: + LOGGER.info("Fetching RODARS data.") + cur.execute(select_query) + data = cur.fetchall() + df = pd.DataFrame(data) + df.columns=[x.name for x in cur.description] + except Error as exc: + LOGGER.critical("Error fetching RODARS data.") + LOGGER.critical(exc) + raise Exception() + + pkeys = ['divisionid', 'issueid', 'timestamputc', 'locationindex'] + + geom_data = df['geometry'].map(geometry_from_bytes) + valid_geoms = [not(x is None) for x in geom_data] + geoms_df = df[pkeys][valid_geoms] + geoms_df.insert(3, 'geom_text', geom_data[valid_geoms].map(coordinates_to_geomfromtext)) + df_no_geom = pd.merge(df.drop('geometry', axis = 1), geoms_df, on = pkeys, how='left') + + expanded_list = [] + for row in df_no_geom.iterrows(): + expanded = process_lanesaffected(row[1]['lanesaffected']) + # Add primary key columns to the expanded data + if expanded is None: + continue + for col in pkeys: + expanded[col] = row[1][col] + expanded_list.append(expanded) + df_expanded = pd.concat(expanded_list, ignore_index=True) + df_no_geom = pd.merge(df_no_geom, df_expanded, on = pkeys, how='left') + + cols_to_insert = [ + 'divisionid', 'issueid', 'timestamputc', 'locationindex', 'mainroadname', 'fromroadname', + 'toroadname', 'direction_toplevel', 'lanesaffected', 'streetnumber', 'locationtype', 'groupid', + 'groupdescription', 'locationblocklevel_toplevel', 'roadclosuretype_toplevel', + 'encodedcoordinates_toplevel', 'locationdescription_toplevel', 'direction', 'roadname', + 'centreline_id', 'linear_name_id', 'lanesaffectedpattern', 'laneblocklevel', + 'roadclosuretype', 'geom_text' + ] + df_no_geom.columns = map(str.lower, df_no_geom.columns) + #check for extra columns unpacked from json. + extra_cols = [col for col in df_no_geom.columns if col not in cols_to_insert] + if extra_cols != []: + LOGGER.warning('There are extra columns unpacked from json not being inserted: %s', extra_cols) + #add missing columns (inconsistent jsons) + missing_cols = [col for col in cols_to_insert if col not in df_no_geom.columns] + if missing_cols != []: + for col in missing_cols: + df_no_geom.insert(0, col, None) + + #convert some datatypes to int + cols_to_convert = ["locationblocklevel_toplevel", "roadclosuretype_toplevel", "direction", "centreline_id", "linear_name_id", "laneblocklevel", "roadclosuretype", "groupid"] + df_no_geom[cols_to_convert] = df_no_geom[cols_to_convert].replace({pd.NaT: 0, nan: 0}) + df_no_geom[cols_to_convert] = df_no_geom[cols_to_convert].astype('int32') + + #transform values for inserting + df_no_geom = df_no_geom.replace({pd.NaT: None, nan: None, '': None}) + + #arrange columns for inserting + df_no_geom = df_no_geom[cols_to_insert] + df_no_geom = [tuple(x) for x in df_no_geom.to_numpy()] + + insert_fpath = os.path.join(SQL_DIR, 'insert-rodars_issue_locations.sql') + with open(insert_fpath, 'r', encoding="utf-8") as file: + insert_query = sql.SQL(file.read()) + + with insert_conn.get_conn() as con, con.cursor() as cur: + execute_values(cur, insert_query, df_no_geom, page_size = 1000) diff --git a/events/road_permits/sql/create-function-delete_old_rodars_issue_locations.sql b/events/road_permits/sql/create-function-delete_old_rodars_issue_locations.sql new file mode 100644 index 000000000..b1f40c6d1 --- /dev/null +++ b/events/road_permits/sql/create-function-delete_old_rodars_issue_locations.sql @@ -0,0 +1,31 @@ +CREATE OR REPLACE FUNCTION congestion_events.delete_old_rodars_issue_locations() +RETURNS trigger AS $$ +BEGIN + + WITH latest AS ( + SELECT + divisionid, + issueid, + MAX(timestamputc) AS max_timestamputc + FROM congestion_events.rodars_issue_locations + GROUP BY + divisionid, + issueid + ) + + -- Delete records older than the current one for the same primary keys + DELETE FROM congestion_events.rodars_issue_locations AS iil + USING latest + WHERE + iil.divisionid = latest.divisionid + AND iil.issueid = latest.issueid + AND iil.timestamputc < latest.max_timestamputc; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +GRANT EXECUTE ON FUNCTION congestion_events.delete_old_rodars_issue_locations TO congestion_admins; +GRANT EXECUTE ON FUNCTION congestion_events.delete_old_rodars_issue_locations TO events_bot; + +COMMENT ON FUNCTION congestion_events.delete_old_rodars_issue_locations IS +'Deletes old records from congestion_events.rodars_issue_locations on insert (trigger).'; \ No newline at end of file diff --git a/events/road_permits/sql/create-function-get_lanesaffected_sum.sql b/events/road_permits/sql/create-function-get_lanesaffected_sum.sql new file mode 100644 index 000000000..3b6250de4 --- /dev/null +++ b/events/road_permits/sql/create-function-get_lanesaffected_sum.sql @@ -0,0 +1,51 @@ +-- FUNCTION: itsc_factors.get_lanesaffected_sums(text) + +-- DROP FUNCTION IF EXISTS itsc_factors.get_lanesaffected_sums(text); + +CREATE OR REPLACE FUNCTION itsc_factors.get_lanesaffected_sums(input_string text) +RETURNS TABLE ( + lap_descriptions text [], + lane_open_auto integer, + lane_closed_auto integer, + lane_open_bike integer, + lane_closed_bike integer, + lane_open_ped integer, + lane_closed_ped integer, + lane_open_bus integer, + lane_closed_bus integer +) +LANGUAGE plpgsql +COST 100 +STABLE PARALLEL SAFE +ROWS 1 + +AS $BODY$ +DECLARE + code_list TEXT[]; +BEGIN + + -- Iterate over the list and aggregate sums for each code + RETURN QUERY + SELECT + ARRAY_AGG(lane_status) AS lap_descriptions, + COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Car'), 0)::int AS lane_open_auto, + COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Car'), 0)::int AS lane_closed_auto, + COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Bike'), 0)::int AS lane_open_bike, + COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Bike'), 0)::int AS lane_closed_bike, + COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Pedestrian'), 0)::int AS lane_open_ped, + COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Pedestrian'), 0)::int AS lane_closed_ped, + COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Bus'), 0)::int AS lane_open_bus, + COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Bus'), 0)::int AS lane_closed_bus + FROM UNNEST(regexp_split_to_array(input_string, E'(?=(..)+$)')) AS c + JOIN itsc_factors.lanesaffectedpattern AS lap ON lap.code = c; + +END; +$BODY$; + +ALTER FUNCTION itsc_factors.get_lanesaffected_sums(text) OWNER TO congestion_admins; + +GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO bdit_humans; + +GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO congestion_admins; + +GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO events_bot; diff --git a/events/road_permits/sql/create-table-itsc_factors-direction.sql b/events/road_permits/sql/create-table-itsc_factors-direction.sql new file mode 100644 index 000000000..31e7a2a3a --- /dev/null +++ b/events/road_permits/sql/create-table-itsc_factors-direction.sql @@ -0,0 +1,17 @@ +-- Table: itsc_factors.direction + +-- DROP TABLE IF EXISTS itsc_factors.direction; + +CREATE TABLE IF NOT EXISTS itsc_factors.direction +( + code integer NOT NULL, + direction text COLLATE pg_catalog."default" NOT NULL, + CONSTRAINT itsc_factors_direction_pkey PRIMARY KEY (code) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS itsc_factors.direction +OWNER TO congestion_admins; + +GRANT SELECT ON TABLE itsc_factors.direction TO events_bot; \ No newline at end of file diff --git a/events/road_permits/sql/create-table-itsc_factors_lanesaffectedpattern.sql b/events/road_permits/sql/create-table-itsc_factors_lanesaffectedpattern.sql new file mode 100644 index 000000000..fadfc0a2d --- /dev/null +++ b/events/road_permits/sql/create-table-itsc_factors_lanesaffectedpattern.sql @@ -0,0 +1,24 @@ +-- Table: itsc_factors.lanesaffectedpattern + +-- DROP TABLE IF EXISTS itsc_factors.lanesaffectedpattern; + +CREATE TABLE IF NOT EXISTS itsc_factors.lanesaffectedpattern +( + lane_status text COLLATE pg_catalog."default", + code text COLLATE pg_catalog."default" NOT NULL, + mode text COLLATE pg_catalog."default", + lane_open numeric, + lane_closed numeric, + CONSTRAINT lanesaffected_pattern_pkey PRIMARY KEY (code) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS itsc_factors.lanesaffectedpattern +OWNER TO congestion_admins; + +REVOKE ALL ON TABLE itsc_factors.lanesaffectedpattern FROM bdit_humans; + +GRANT SELECT ON TABLE itsc_factors.lanesaffectedpattern TO bdit_humans; + +GRANT SELECT ON TABLE itsc_factors.lanesaffectedpattern TO events_bot; diff --git a/events/road_permits/sql/create-table-itsc_factors_locationblocklevel.sql b/events/road_permits/sql/create-table-itsc_factors_locationblocklevel.sql new file mode 100644 index 000000000..d83c55143 --- /dev/null +++ b/events/road_permits/sql/create-table-itsc_factors_locationblocklevel.sql @@ -0,0 +1,17 @@ +-- Table: itsc_factors.locationblocklevel + +-- DROP TABLE IF EXISTS itsc_factors.locationblocklevel; + +CREATE TABLE IF NOT EXISTS itsc_factors.locationblocklevel +( + code integer NOT NULL, + locationblocklevel text COLLATE pg_catalog."default" NOT NULL, + CONSTRAINT locationblocklevel_pkey PRIMARY KEY (code) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS itsc_factors.locationblocklevel +OWNER TO congestion_admins; + +GRANT SELECT ON TABLE itsc_factors.locationblocklevel TO events_bot; \ No newline at end of file diff --git a/events/road_permits/sql/create-table-itsc_factors_roadclosuretype_old.sql b/events/road_permits/sql/create-table-itsc_factors_roadclosuretype_old.sql new file mode 100644 index 000000000..6d6920b88 --- /dev/null +++ b/events/road_permits/sql/create-table-itsc_factors_roadclosuretype_old.sql @@ -0,0 +1,17 @@ +-- Table: itsc_factors.roadclosuretype_old + +-- DROP TABLE IF EXISTS itsc_factors.roadclosuretype_old; + +CREATE TABLE IF NOT EXISTS itsc_factors.roadclosuretype_old +( + code integer NOT NULL, + roadclosuretype text COLLATE pg_catalog."default" NOT NULL, + CONSTRAINT roadclosuretype_old_pkey PRIMARY KEY (code) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS itsc_factors.roadclosuretype_old +OWNER TO congestion_admins; + +GRANT SELECT ON TABLE itsc_factors.roadclosuretype_old TO events_bot; diff --git a/events/road_permits/sql/create-table-rodars_issues.sql b/events/road_permits/sql/create-table-rodars_issues.sql new file mode 100644 index 000000000..7e649367c --- /dev/null +++ b/events/road_permits/sql/create-table-rodars_issues.sql @@ -0,0 +1,46 @@ +-- Table: congestion_events.rodars_issues + +-- DROP TABLE IF EXISTS congestion_events.rodars_issues; + +CREATE TABLE IF NOT EXISTS congestion_events.rodars_issues +( + divisionid smallint NOT NULL, + divisionname text, + issueid integer NOT NULL, + timestamputc timestamp without time zone, + issuetype smallint, + description text, + priority smallint, + proposedstarttimestamp timestamp without time zone, + proposedendtimestamp timestamp without time zone, + earlyendtimestamp timestamp without time zone, + status integer, + timeoption smallint, + sourceid text, + starttimestamp timestamp without time zone, + endtimestamp timestamp without time zone, + kmpost double precision, + managementurl text, + cancellationstatus integer, + closeissueonplannedendtime boolean, + plannedstartadvancenoticeseconds integer, + plannedendadvancenoticeseconds integer, + locationdescriptionoverwrite text, + startissueonplannedstarttime boolean, + startstatus integer, + updateremindernoticeseconds integer, + CONSTRAINT rodars_issues_pkey PRIMARY KEY (divisionid, issueid) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS congestion_events.rodars_issues OWNER TO congestion_admins; + +REVOKE ALL ON TABLE congestion_events.rodars_issues FROM bdit_humans; + +GRANT ALL ON TABLE congestion_events.rodars_issues TO dbadmin; + +GRANT ALL ON TABLE congestion_events.rodars_issues TO events_bot; + +COMMENT ON TABLE congestion_events.rodars_issues IS +'Raw RODARs data. See instead VIEW `congestion_events.rodars_locations`.'; diff --git a/events/road_permits/sql/create-table-rodars_issues_locations.sql b/events/road_permits/sql/create-table-rodars_issues_locations.sql new file mode 100644 index 000000000..809718edd --- /dev/null +++ b/events/road_permits/sql/create-table-rodars_issues_locations.sql @@ -0,0 +1,46 @@ +-- Table: congestion_events.rodars_issue_locations + +-- DROP TABLE IF EXISTS congestion_events.rodars_issue_locations; + +CREATE TABLE IF NOT EXISTS congestion_events.rodars_issue_locations +( + divisionid smallint NOT NULL, + issueid integer NOT NULL, + timestamputc timestamp without time zone, + locationindex integer, + mainroadname text, + fromroadname text, + toroadname text, + direction smallint, + lanesaffected text, + geometry geometry, + streetnumber text, + locationtype integer, + groupid integer, + groupdescription text, + lanesaffected_direction integer, + roadname text, + centreline_id integer, + linear_name_id integer, + lanesaffectedpattern text, + laneblocklevel integer, + roadclosuretype integer +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS congestion_events.rodars_issue_locations OWNER TO congestion_admins; + +REVOKE ALL ON TABLE congestion_events.rodars_issue_locations FROM bdit_humans; + +GRANT ALL ON TABLE congestion_events.rodars_issue_locations TO dbadmin; + +GRANT ALL ON TABLE congestion_events.rodars_issue_locations TO events_bot; + +CREATE TRIGGER on_insert_delete_old +AFTER INSERT ON congestion_events.rodars_issue_locations +FOR EACH STATEMENT +EXECUTE FUNCTION congestion_events.delete_old_rodars_issue_locations(); + +COMMENT ON TABLE congestion_events.rodars_issue_locations IS +'Raw RODARs data. See instead VIEW `congestion_events.rodars_locations`.'; diff --git a/events/road_permits/sql/create-view-rodars_locations.sql b/events/road_permits/sql/create-view-rodars_locations.sql new file mode 100644 index 000000000..7e20139e3 --- /dev/null +++ b/events/road_permits/sql/create-view-rodars_locations.sql @@ -0,0 +1,120 @@ +-- View: congestion_events.rodars_locations + +-- DROP VIEW congestion_events.rodars_locations; + +CREATE OR REPLACE VIEW congestion_events.rodars_locations +AS +SELECT + ii.divisionid, + ii.divisionname, + ii.issueid, + ii.sourceid, + ii.description, + CASE ii.priority + WHEN 5 THEN 'Critical' + WHEN 4 THEN 'High' + WHEN 3 THEN 'Medium' + WHEN 2 THEN 'Low' + WHEN 1 THEN 'None' + END AS priority, + CASE ii.status + WHEN 1 THEN 'Future' + WHEN 2 THEN 'In Progress' + WHEN 3 THEN 'Ended' + WHEN 4 THEN 'Cancelled' + WHEN 8 THEN 'Overdue' + WHEN 12 THEN 'In Progress' + END AS status, + ii.starttimestamp, + ii.endtimestamp, + ii.endtimestamp - ii.starttimestamp AS actual_duration, + ii.proposedstarttimestamp, + ii.proposedendtimestamp, + ii.proposedendtimestamp - ii.proposedstarttimestamp AS proposed_duration, + ii.proposedstarttimestamp::time without time zone AS proposedstart_tod, + ii.proposedendtimestamp::time without time zone AS proposedend_tod, + CASE ii.timeoption + WHEN 0 THEN 'Continuous'::text + WHEN 1 THEN 'Daily'::text + WHEN 2 THEN 'Weekdays'::text + WHEN 3 THEN 'Weekends'::text + WHEN 4 THEN 'Activity Schedule'::text + ELSE NULL::text + END AS recurrence_schedule, + iil.groupdescription AS location_description, + iil.mainroadname, + iil.fromroadname, + iil.toroadname, + iil.streetnumber, + itsc_factors.locationblocklevel.locationblocklevel, + itsc_factors.roadclosuretype_old.roadclosuretype AS roadclosuretype_desc, + iil.locationdescription_toplevel, + d2.direction, + iil.roadname, + CASE iil.centreline_id WHEN 0 THEN NULL ELSE iil.centreline_id END AS centreline_id, + COALESCE( + centreline_latest.geom, + --find geoms for centreline that do not appear in centreline_latest + --wrapping inside case statement coalesce/case prevents unnecessary execution + CASE + WHEN iil.centreline_id > 0 THEN ( + SELECT centreline.geom + FROM gis_core.centreline + WHERE centreline.centreline_id = iil.centreline_id + ORDER BY centreline.version_date DESC + LIMIT 1 + ) + END + ) AS centreline_geom, + iil.geom AS issue_geom, + CASE iil.linear_name_id WHEN 0 THEN NULL ELSE iil.linear_name_id END AS linear_name_id, + iil.lanesaffectedpattern, + lap.lap_descriptions, + lap.lane_open_auto, + lap.lane_closed_auto, + lap.lane_open_bike, + lap.lane_closed_bike, + lap.lane_open_ped, + lap.lane_closed_ped, + lap.lane_open_bus, + lap.lane_closed_bus +FROM congestion_events.rodars_issues AS ii +JOIN congestion_events.rodars_issue_locations AS iil + ON iil.issueid = ii.issueid + AND iil.divisionid = ii.divisionid + AND iil.timestamputc = ii.timestamputc +LEFT JOIN gis_core.centreline_latest USING (centreline_id) +LEFT JOIN itsc_factors.direction AS d1 + ON d1.code = iil.direction_toplevel +LEFT JOIN itsc_factors.direction AS d2 + ON d2.code = iil.direction::numeric::integer +LEFT JOIN itsc_factors.locationblocklevel + ON iil.laneblocklevel::numeric::integer = itsc_factors.locationblocklevel.code +LEFT JOIN itsc_factors.roadclosuretype_old + ON iil.roadclosuretype::numeric::integer = itsc_factors.roadclosuretype_old.code, + LATERAL ( + SELECT + get_lanesaffected_sums.lap_descriptions, + get_lanesaffected_sums.lane_open_auto, + get_lanesaffected_sums.lane_closed_auto, + get_lanesaffected_sums.lane_open_bike, + get_lanesaffected_sums.lane_closed_bike, + get_lanesaffected_sums.lane_open_ped, + get_lanesaffected_sums.lane_closed_ped, + get_lanesaffected_sums.lane_open_bus, + get_lanesaffected_sums.lane_closed_bus + --expand lanesaffectedpattern column. + FROM + itsc_factors.get_lanesaffected_sums(iil.lanesaffectedpattern) + AS get_lanesaffected_sums ( + lap_descriptions, lane_open_auto, lane_closed_auto, lane_open_bike, + lane_closed_bike, lane_open_ped, lane_closed_ped, lane_open_bus, lane_closed_bus + ) + ) AS lap; + +ALTER TABLE congestion_events.rodars_locations +OWNER TO congestion_admins; + +GRANT SELECT ON TABLE congestion_events.rodars_locations TO bdit_humans; +GRANT ALL ON TABLE congestion_events.rodars_locations TO congestion_admins; +GRANT SELECT ON TABLE congestion_events.rodars_locations TO events_bot; diff --git a/events/road_permits/sql/insert-rodars_issue_locations.sql b/events/road_permits/sql/insert-rodars_issue_locations.sql new file mode 100644 index 000000000..6f9757e73 --- /dev/null +++ b/events/road_permits/sql/insert-rodars_issue_locations.sql @@ -0,0 +1,47 @@ +--this redundant CTE is just to apply ST_GeomFromText to geom_text. +WITH locations ( + divisionid, issueid, timestamputc, locationindex, mainroadname, fromroadname, + toroadname, direction_toplevel, lanesaffected, streetnumber, locationtype, groupid, + groupdescription, locationblocklevel_toplevel, roadclosuretype_toplevel, + encodedcoordinates_toplevel, locationdescription_toplevel, direction, roadname, + centreline_id, linear_name_id, lanesaffectedpattern, laneblocklevel, + roadclosuretype, geom_text +) AS ( + VALUES %s --noqa: PRS +) + +INSERT INTO congestion_events.rodars_issue_locations ( + divisionid, issueid, timestamputc, locationindex, mainroadname, fromroadname, toroadname, + direction_toplevel, streetnumber, locationtype, groupid, groupdescription, lanesaffected, + locationblocklevel_toplevel, roadclosuretype_toplevel, encodedcoordinates_toplevel, + locationdescription_toplevel, direction, roadname, centreline_id, linear_name_id, + lanesaffectedpattern, laneblocklevel, roadclosuretype, geom +) +--rarely, you can get duplicate values from the unnested lanesaffected json. +SELECT DISTINCT + divisionid, + issueid, + timestamputc, + locationindex, + mainroadname, + fromroadname, + toroadname, + direction_toplevel, + streetnumber, + locationtype, + groupid, + groupdescription, + lanesaffected, + locationblocklevel_toplevel, + roadclosuretype_toplevel, + encodedcoordinates_toplevel, + locationdescription_toplevel, + direction, + roadname, + centreline_id, + linear_name_id, + lanesaffectedpattern, + laneblocklevel, + roadclosuretype, + st_geomfromtext(geom_text, 4326) AS geom +FROM locations; diff --git a/events/road_permits/sql/insert-rodars_issues.sql b/events/road_permits/sql/insert-rodars_issues.sql new file mode 100644 index 000000000..f1086eb1f --- /dev/null +++ b/events/road_permits/sql/insert-rodars_issues.sql @@ -0,0 +1,35 @@ +INSERT INTO congestion_events.rodars_issues ( + divisionid, divisionname, issueid, timestamputc, issuetype, description, priority, + proposedstarttimestamp, proposedendtimestamp, earlyendtimestamp, status, timeoption, + sourceid, starttimestamp, + endtimestamp, kmpost, managementurl, cancellationstatus, closeissueonplannedendtime, + plannedstartadvancenoticeseconds, plannedendadvancenoticeseconds, + locationdescriptionoverwrite, startissueonplannedstarttime, startstatus, + updateremindernoticeseconds +) +VALUES %s +ON CONFLICT (divisionid, issueid) +DO UPDATE SET +divisionname = excluded.divisionname, +timestamputc = excluded.timestamputc, +issuetype = excluded.issuetype, +description = excluded.description, +priority = excluded.priority, +proposedstarttimestamp = excluded.proposedstarttimestamp, +proposedendtimestamp = excluded.proposedendtimestamp, +earlyendtimestamp = excluded.earlyendtimestamp, +status = excluded.status, +timeoption = excluded.timeoption, +sourceid = excluded.sourceid, +starttimestamp = excluded.starttimestamp, +endtimestamp = excluded.endtimestamp, +kmpost = excluded.kmpost, +managementurl = excluded.managementurl, +cancellationstatus = excluded.cancellationstatus, +closeissueonplannedendtime = excluded.closeissueonplannedendtime, +plannedstartadvancenoticeseconds = excluded.plannedstartadvancenoticeseconds, +plannedendadvancenoticeseconds = excluded.plannedendadvancenoticeseconds, +locationdescriptionoverwrite = excluded.locationdescriptionoverwrite, +startissueonplannedstarttime = excluded.startissueonplannedstarttime, +startstatus = excluded.startstatus, +updateremindernoticeseconds = excluded.updateremindernoticeseconds; diff --git a/events/road_permits/sql/select-rodars_issue_locations.sql b/events/road_permits/sql/select-rodars_issue_locations.sql new file mode 100644 index 000000000..5c60cbff8 --- /dev/null +++ b/events/road_permits/sql/select-rodars_issue_locations.sql @@ -0,0 +1,41 @@ +--this select query is used to select issue locations from ITSC database in rodars_pull pipeline. + +WITH issues AS ( + --select the most recent version of each issue + SELECT + divisionid, + issueid, + MAX(timestamputc) AS timestamputc + FROM public.issuedata + WHERE + divisionid IN ( + 8048, --rodars new + 8014 --rodars (old) + ) + GROUP BY + divisionid, + issueid + HAVING + MAX(timestamputc) >= {start}::date -- noqa: PRS, LT02 + AND MAX(timestamputc) < {start}::date + interval '1 day' -- noqa: PRS +) + +SELECT + i.divisionid, + i.issueid, + i.timestamputc, + --Old rodars data doesn't have this value + COALESCE(iln.locationindex, 0) AS locationindex, + iln.mainroadname, + iln.fromroadname, + iln.toroadname, + iln.direction AS direction_toplevel, + iln.lanesaffected, + iln.geometry, + iln.streetnumber, + iln.locationtype, + iln.groupid::integer, + iln.groupdescription +--Note there are multiple locations for each issue (unique locationindex) +FROM public.issuelocationnew AS iln +JOIN issues AS i USING (divisionid, issueid, timestamputc) diff --git a/events/road_permits/sql/select-rodars_issues.sql b/events/road_permits/sql/select-rodars_issues.sql new file mode 100644 index 000000000..4de07775d --- /dev/null +++ b/events/road_permits/sql/select-rodars_issues.sql @@ -0,0 +1,65 @@ +--this select query is used to select issue metadata from ITSC database in rodars_pull pipeline. + +WITH issues AS ( + --select the most recent version of each issue + SELECT DISTINCT ON (divisionid, issueid) + divisionid, + issueid, + timestamputc, + issuetype, + description, + priority, + proposedstarttimestamputc, + proposedendtimestamputc, + earlyendtimestamputc, + status, + timeoption + FROM public.issuedata + WHERE + divisionid IN ( + 8048, --rodars new + 8014 --rodars (old) + ) + AND timestamputc >= {start}::date -- noqa: PRS, LT02 + AND timestamputc < {start}::date + interval '1 day' -- noqa: PRS + ORDER BY + divisionid ASC, + issueid ASC, + timestamputc DESC +) + +SELECT + issues.divisionid, + datadivision.shortname AS divisionname, + issues.issueid, + issues.timestamputc, + issues.issuetype, + issues.description, + issues.priority, + TIMEZONE('UTC', issues.proposedstarttimestamputc) AT TIME ZONE 'America/Toronto' + AS proposedstarttimestamp, + TIMEZONE('UTC', issues.proposedendtimestamputc) AT TIME ZONE 'America/Toronto' + AS proposedendtimestamp, + TIMEZONE('UTC', issues.earlyendtimestamputc) AT TIME ZONE 'America/Toronto' + AS earlyendtimestamp, + issues.status, + issues.timeoption, + issueconfig.sourceid, + TIMEZONE('UTC', issueconfig.starttimestamputc) AT TIME ZONE 'America/Toronto' + AS starttimestamp, + TIMEZONE('UTC', issueconfig.endtimestamputc) AT TIME ZONE 'America/Toronto' + AS endtimestamp, + issueconfig.kmpost, + issueconfig.managementurl, + issueconfig.cancellationstatus, + issueconfig.closeissueonplannedendtime, + issueconfig.plannedstartadvancenoticeseconds, + issueconfig.plannedendadvancenoticeseconds, + issueconfig.locationdescriptionoverwrite, + issueconfig.startissueonplannedstarttime, + issueconfig.startstatus, + issueconfig.updateremindernoticeseconds +FROM issues +LEFT JOIN public.issueconfig USING (divisionid, issueid) +LEFT JOIN public.datadivision USING (divisionid) +ORDER BY issues.issueid DESC diff --git a/events/address_functions.py b/events/special_events/address_functions.py similarity index 100% rename from events/address_functions.py rename to events/special_events/address_functions.py diff --git a/events/gen_event_list.py b/events/special_events/gen_event_list.py similarity index 100% rename from events/gen_event_list.py rename to events/special_events/gen_event_list.py diff --git a/events/img/schema.png b/events/special_events/img/schema.png similarity index 100% rename from events/img/schema.png rename to events/special_events/img/schema.png diff --git a/events/parse_event_json.py b/events/special_events/parse_event_json.py similarity index 100% rename from events/parse_event_json.py rename to events/special_events/parse_event_json.py diff --git a/events/parse_event_xml.py b/events/special_events/parse_event_xml.py similarity index 100% rename from events/parse_event_xml.py rename to events/special_events/parse_event_xml.py diff --git a/events/README.md b/events/special_events/readme.md similarity index 100% rename from events/README.md rename to events/special_events/readme.md diff --git a/events/tickermaster_api_call.py b/events/special_events/tickermaster_api_call.py similarity index 100% rename from events/tickermaster_api_call.py rename to events/special_events/tickermaster_api_call.py diff --git a/road_closure/parse_road_closure_xml.py b/road_closure/parse_road_closure_xml.py deleted file mode 100644 index eb5757c89..000000000 --- a/road_closure/parse_road_closure_xml.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Thu Oct 27 10:12:48 2016 - -@author: qwang2 -""" - -import requests -import xml.etree.ElementTree as ET -from pg import DB -import time -import configparser - -CONFIG = configparser.ConfigParser() -CONFIG.read('db.cfg') -dbset = CONFIG['DBSETTINGS'] - -proxies = {'http':'http://137.15.73.132:8080'} -db = DB(dbname=dbset['database'],host=dbset['host'],user=dbset['user'],passwd=dbset['password']) - -r = requests.get('http://www1.toronto.ca/transportation/roadrestrictions/RoadRestrictions.xml', proxies = proxies) - -tree = ET.fromstring(r.content) - -# if writing to csv -''' -f = open('RoadClosures_open_data.csv', 'w') -f.write("id,road,name,district,latitude,longitude,roadclass,planned,severityoverride,"\ - "source,lastupdated,starttime,endtime,workperiod,expired,signing,notification,workeventtype,"\ - "contractor,permittype,description\n") -''' - -# upload directly to database -data = [] -for a in zip(tree.findall(".//Id"), tree.findall(".//Road"), tree.findall(".//Name"),tree.findall(".//District"), - tree.findall(".//Latitude"), tree.findall(".//Longitude"), tree.findall(".//RoadClass"), - tree.findall(".//Planned"), tree.findall(".//SeverityOverride"), tree.findall(".//Source"), - tree.findall(".//LastUpdated"), tree.findall(".//StartTime"), tree.findall(".//EndTime"), - tree.findall(".//WorkPeriod"), tree.findall(".//Expired"), tree.findall(".//Signing"), - tree.findall(".//Notification"), tree.findall(".//WorkEventType"), tree.findall(".//Contractor"), - tree.findall(".//PermitType"), tree.findall(".//Description")): - rowsql = [] - for x in a: - if x.tag in ('LastUpdated', 'StartTime', 'EndTime') and x.text is not None: - y = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(x.text)/1000)) - else: - y = x.text - if x.tag == 'Name': - rowsql.append(None) - rowsql.append(None) - rowsql.append(None) - rowsql.append(y) - rowsql.append(None) - data.append(rowsql) - -db.truncate('city.restrictions_import') -db.inserttable('city.restrictions_import', data) - -sql = db.query("DELETE FROM city.restrictions USING city.restrictions_import WHERE city.restrictions.id = city.restrictions_import.id") -sql= db.query("INSERT INTO city.restrictions SELECT * FROM city.restrictions_import") -db.close()