Skip to content

Commit

Permalink
use subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
amishas157 committed Oct 2, 2024
1 parent 9e357ff commit 334c73f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
4 changes: 2 additions & 2 deletions dags/dbt_source_data_freshness_test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
# DBT tests to run
source_freshness_tests = dbt_task(
dag,
model_name="all_sources",
command_type="source freshness",
command_type="source",
sub_command="freshness",
flag=None,
tag=None,
resource_cfg="dbt",
Expand Down
11 changes: 9 additions & 2 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def create_dbt_profile(project="prod"):
def dbt_task(
dag,
model_name=None,
sub_command=None,
tag=None,
flag="select",
operator="",
Expand All @@ -87,7 +88,12 @@ def dbt_task(

dbt_image = "{{ var.value.dbt_image_name }}"

args = [command_type, f"--{flag}"] if flag else [command_type]
args = [command_type]
if sub_command:
args.append(sub_command)

if flag:
args.append(f"--{flag}")

models = []
if tag:
Expand All @@ -96,6 +102,8 @@ def dbt_task(
if model_name:
task_name = model_name
models.append(f"{operator}{model_name}")
if sub_command:
task_name = sub_command
if len(models) > 1:
task_name = "multiple_models"
args.append(",".join(models))
Expand All @@ -117,7 +125,6 @@ def dbt_task(

logging.info(f"sh commands to run in pod: {args}")

command_type = command_type.replace(" ", "_")
return KubernetesPodOperator(
task_id=f"dbt_{command_type}_{task_name}",
name=f"dbt_{command_type}_{task_name}",
Expand Down

0 comments on commit 334c73f

Please sign in to comment.