diff --git a/warehouse/models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_day_map_grouping.sql b/warehouse/models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_day_map_grouping.sql new file mode 100644 index 0000000000..a99496f506 --- /dev/null +++ b/warehouse/models/intermediate/gtfs/int_gtfs_rt__trip_updates_trip_day_map_grouping.sql @@ -0,0 +1,76 @@ +{{ + config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by={ + 'field': 'calculated_service_date_pacific', + 'data_type': 'date', + 'granularity': 'day', + }, + cluster_by='base64_url', + ) +}} + +WITH stop_time_updates AS ( + SELECT * + FROM {{ ref('fct_stop_time_updates') }} + WHERE {{ gtfs_rt_dt_where() }} +), + +rt_feeds AS ( + SELECT * + FROM {{ ref('fct_daily_rt_feed_files') }} +), + +schedule_feeds AS ( + SELECT * + FROM {{ ref('dim_schedule_feeds') }} +), + +-- group by *both* the UTC date that data was scraped (dt) *and* calculated service date +-- so that in the mart we can get just service date-level data +-- this allows us to handle the dt/service_date mismatch by grouping in two stages +int_gtfs_rt__trip_updates_trip_day_map_grouping AS ( + SELECT + -- try to figure out what the service date would be to join back with schedule: fall back from explicit to imputed + -- TODO: it's possible that this could lead to some weirdness around midnight Pacific / in feed timezone + -- if `trip_start_date` is not set we theoretically should be trying to grab the date of the first arrival time per trip + -- because trip updates may be generated hours before the beginning of the actual trip activity + -- however the fact that this would occur near date boundaries is precisely why it's a bit tricky to pick the right first arrival time if trip start date is not populated + dt, + COALESCE( + PARSE_DATE("%Y%m%d", trip_start_date), + DATE(header_timestamp, schedule_feeds.feed_timezone), + DATE(_extract_ts, schedule_feeds.feed_timezone)) AS calculated_service_date, + base64_url, + trip_id, + trip_route_id, + trip_direction_id, + trip_start_time, + trip_start_date, + trip_schedule_relationship, + schedule_feeds.feed_timezone, + ARRAY_AGG(DISTINCT id) AS message_ids_array, + ARRAY_AGG(DISTINCT header_timestamp) AS header_timestamps_array, + ARRAY_AGG(DISTINCT trip_update_timestamp) AS trip_update_timestamps_array, + MIN(_extract_ts) AS min_extract_ts, + MAX(_extract_ts) AS max_extract_ts, + MIN(header_timestamp) AS min_header_timestamp, + MAX(header_timestamp) AS max_header_timestamp, + MIN(trip_update_timestamp) AS min_trip_update_timestamp, + MAX(trip_update_timestamp) AS max_trip_update_timestamp, + MAX(trip_update_delay) AS max_delay, + ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'SKIPPED' THEN stop_id END) AS skipped_stops_array, + ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'SCHEDULED' THEN stop_id END) AS scheduled_stops_array, + ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'CANCELED' THEN stop_id END) AS canceled_stops_array, + ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'ADDED' THEN stop_id END) AS added_stops_array, + FROM stop_time_updates + LEFT JOIN rt_feeds + ON stop_time_updates.base64_url = rt_feeds.base64_url + AND stop_time_updates.dt = rt_feeds.date + LEFT JOIN schedule_feeds + ON rt_feeds.schedule_feed_key = schedule_feeds.key + GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 +) + +SELECT * FROM int_gtfs_rt__trip_updates_trip_day_map_grouping diff --git a/warehouse/models/mart/gtfs/fct_trip_updates_messages.sql b/warehouse/models/mart/gtfs/fct_trip_updates_messages.sql index bac2301554..9a6e42da08 100644 --- a/warehouse/models/mart/gtfs/fct_trip_updates_messages.sql +++ b/warehouse/models/mart/gtfs/fct_trip_updates_messages.sql @@ -40,13 +40,6 @@ fct_trip_updates_messages AS ( TIMESTAMP_DIFF(_extract_ts, header_timestamp, SECOND) AS _header_message_age, TIMESTAMP_DIFF(_extract_ts, trip_update_timestamp, SECOND) AS _trip_update_message_age, TIMESTAMP_DIFF(header_timestamp, trip_update_timestamp, SECOND) AS _trip_update_message_age_vs_header, - -- TODO: once #2457 merges, we should use the schedule feed timezone rather than just Pacific - -- we need to get individual trip instances that can be merged with schedule feed trip instances - COALESCE( - PARSE_DATE("%Y%m%d",trip_start_date), - DATE(trip_update_timestamp, "America/Los_Angeles"), - DATE(header_timestamp, "America/Los_Angeles"), - DATE(_extract_ts, "America/Los_Angeles")) AS calculated_service_date_pacific, header_timestamp, header_version,