Skip to content

Commit

Permalink
add service alert trip summaries and fix typos
Browse files Browse the repository at this point in the history
  • Loading branch information
atvaccaro committed Jan 23, 2023
1 parent 114bd66 commit 6879c31
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
'field': 'dt',
'data_type': 'date',
'granularity': 'day',
},
cluster_by='base64_url',
)
}}

{% if is_incremental() %}
{% set timestamps = dbt_utils.get_column_values(table=this, column='dt', order_by = 'dt DESC', max_records = 1) %}
{% set max_ts = timestamps[0] %}
{% endif %}

WITH service_alerts AS (
SELECT * FROM {{ ref('fct_service_alert_informed_entities') }}
{% if is_incremental() %}
WHERE dt >= EXTRACT(DATE FROM TIMESTAMP('{{ max_ts }}'))
{% else %}
WHERE dt >= DATE_SUB(CURRENT_DATE(), INTERVAL {{ var('TRIP_UPDATES_LOOKBACK_DAYS') }} DAY)
{% endif %}
),

int_gtfs_rt__service_alerts_trip_summaries AS (
SELECT
-- https://gtfs.org/realtime/reference/#message-tripdescriptor
{{ dbt_utils.surrogate_key([
'dt',
'base64_url',
'trip_id',
'trip_route_id',
'trip_direction_id',
'trip_start_time',
'trip_start_date',
]) }} as key,
dt,
base64_url,
trip_id,
trip_route_id,
trip_direction_id,
trip_start_time,
trip_start_date,
COUNT(DISTINCT id) AS num_distinct_message_ids,
ARRAY_AGG(DISTINCT service_alert_message_key) AS service_alert_message_keys,
FROM service_alerts
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8
)

SELECT * FROM int_gtfs_rt__service_alerts_trip_summaries
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
}}

{% if is_incremental() %}
{% set timestamps = dbt_utils.get_column_values(table=this, column='dt', order_by = 'dt, DESC', max_records = 1) %}
{% set timestamps = dbt_utils.get_column_values(table=this, column='dt', order_by = 'dt DESC', max_records = 1) %}
{% set max_ts = timestamps[0] %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
}}

{% if is_incremental() %}
{% set timestamps = dbt_utils.get_column_values(table=this, column='dt', order_by = 'dt, DESC', max_records = 1) %}
{% set timestamps = dbt_utils.get_column_values(table=this, column='dt', order_by = 'dt DESC', max_records = 1) %}
{% set max_ts = timestamps[0] %}
{% endif %}

Expand Down
8 changes: 8 additions & 0 deletions warehouse/models/mart/gtfs/fct_observed_trips.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ vehicle_positions AS (
SELECT * FROM {{ ref('int_gtfs_rt__vehicle_positions_trip_summaries') }}
),

service_alerts AS (
SELECT * FROM {{ ref('int_gtfs_rt__service_alerts_trip_summaries') }}
),

fct_observed_trips AS (
SELECT
{{ dbt_utils.surrogate_key([
Expand All @@ -40,9 +44,13 @@ fct_observed_trips AS (
vp.num_distinct_message_ids AS vp_num_distinct_message_ids,
vp.min_trip_update_timestamp AS vp_min_trip_update_timestamp,
vp.max_trip_update_timestamp AS vp_max_trip_update_timestamp,
sa.num_distinct_message_ids AS sa_num_distinct_message_ids,
sa.service_alert_message_keys AS sa_service_alert_message_key,
FROM trip_updates AS tu
FULL OUTER JOIN vehicle_positions AS vp
USING (dt, base64_url, trip_id, trip_route_id, trip_direction_id, trip_start_time, trip_start_date)
FULL OUTER JOIN service_alerts AS sa
USING (dt, base64_url, trip_id, trip_route_id, trip_direction_id, trip_start_time, trip_start_date)
)

SELECT * FROM fct_observed_trips

0 comments on commit 6879c31

Please sign in to comment.