Skip to content

Commit

Permalink
untested wip, move to two-step grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
Laurie Merrell committed May 12, 2023
1 parent 8ff38d0 commit 2084be3
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
'field': 'calculated_service_date_pacific',
'data_type': 'date',
'granularity': 'day',
},
cluster_by='base64_url',
)
}}

WITH stop_time_updates AS (
SELECT *
FROM {{ ref('fct_stop_time_updates') }}
WHERE {{ gtfs_rt_dt_where() }}
),

rt_feeds AS (
SELECT *
FROM {{ ref('fct_daily_rt_feed_files') }}
),

schedule_feeds AS (
SELECT *
FROM {{ ref('dim_schedule_feeds') }}
),

-- group by *both* the UTC date that data was scraped (dt) *and* calculated service date
-- so that in the mart we can get just service date-level data
-- this allows us to handle the dt/service_date mismatch by grouping in two stages
int_gtfs_rt__trip_updates_trip_day_map_grouping AS (
SELECT
-- try to figure out what the service date would be to join back with schedule: fall back from explicit to imputed
-- TODO: it's possible that this could lead to some weirdness around midnight Pacific / in feed timezone
-- if `trip_start_date` is not set we theoretically should be trying to grab the date of the first arrival time per trip
-- because trip updates may be generated hours before the beginning of the actual trip activity
-- however the fact that this would occur near date boundaries is precisely why it's a bit tricky to pick the right first arrival time if trip start date is not populated
dt,
COALESCE(
PARSE_DATE("%Y%m%d", trip_start_date),
DATE(header_timestamp, schedule_feeds.feed_timezone),
DATE(_extract_ts, schedule_feeds.feed_timezone)) AS calculated_service_date,
base64_url,
trip_id,
trip_route_id,
trip_direction_id,
trip_start_time,
trip_start_date,
trip_schedule_relationship,
schedule_feeds.feed_timezone,
ARRAY_AGG(DISTINCT id) AS message_ids_array,
ARRAY_AGG(DISTINCT header_timestamp) AS header_timestamps_array,
ARRAY_AGG(DISTINCT trip_update_timestamp) AS trip_update_timestamps_array,
MIN(_extract_ts) AS min_extract_ts,
MAX(_extract_ts) AS max_extract_ts,
MIN(header_timestamp) AS min_header_timestamp,
MAX(header_timestamp) AS max_header_timestamp,
MIN(trip_update_timestamp) AS min_trip_update_timestamp,
MAX(trip_update_timestamp) AS max_trip_update_timestamp,
MAX(trip_update_delay) AS max_delay,
ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'SKIPPED' THEN stop_id END) AS skipped_stops_array,
ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'SCHEDULED' THEN stop_id END) AS scheduled_stops_array,
ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'CANCELED' THEN stop_id END) AS canceled_stops_array,
ARRAY_AGG(DISTINCT CASE WHEN schedule_relationship = 'ADDED' THEN stop_id END) AS added_stops_array,
FROM stop_time_updates
LEFT JOIN rt_feeds
ON stop_time_updates.base64_url = rt_feeds.base64_url
AND stop_time_updates.dt = rt_feeds.date
LEFT JOIN schedule_feeds
ON rt_feeds.schedule_feed_key = schedule_feeds.key
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
)

SELECT * FROM int_gtfs_rt__trip_updates_trip_day_map_grouping
7 changes: 0 additions & 7 deletions warehouse/models/mart/gtfs/fct_trip_updates_messages.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ fct_trip_updates_messages AS (
TIMESTAMP_DIFF(_extract_ts, header_timestamp, SECOND) AS _header_message_age,
TIMESTAMP_DIFF(_extract_ts, trip_update_timestamp, SECOND) AS _trip_update_message_age,
TIMESTAMP_DIFF(header_timestamp, trip_update_timestamp, SECOND) AS _trip_update_message_age_vs_header,
-- TODO: once #2457 merges, we should use the schedule feed timezone rather than just Pacific
-- we need to get individual trip instances that can be merged with schedule feed trip instances
COALESCE(
PARSE_DATE("%Y%m%d",trip_start_date),
DATE(trip_update_timestamp, "America/Los_Angeles"),
DATE(header_timestamp, "America/Los_Angeles"),
DATE(_extract_ts, "America/Los_Angeles")) AS calculated_service_date_pacific,

header_timestamp,
header_version,
Expand Down

0 comments on commit 2084be3

Please sign in to comment.