How can I unit test my run status sensors to verify they trigger the correct jobs in sequence? #25934
Answered
by
garethbrickman
garethbrickman
asked this question in
Q&A
Replies: 1 comment
-
from dagster import (
run_status_sensor,
job,
RunRequest,
DagsterRunStatus,
DagsterInstance,
build_sensor_context,
)
@job
def certain_job(): ...
@job
def other_job(): ...
@job
def job_1(): ...
@job
def job_2(): ...
@job
def job_3(): ...
@run_status_sensor(
monitored_jobs=[certain_job],
request_job=job_1,
run_status=DagsterRunStatus.SUCCESS,
)
def sensor_1():
return RunRequest(
job_name="job_1",
)
@run_status_sensor(
monitored_jobs=[certain_job],
request_job=job_2,
run_status=DagsterRunStatus.SUCCESS,
)
def sensor_2():
return RunRequest(
job_name="job_2",
)
@run_status_sensor(
monitored_jobs=[other_job],
request_job=job_3,
run_status=DagsterRunStatus.SUCCESS,
)
def sensor_3():
return RunRequest(
job_name="job_3",
)
def test_sensors() -> None:
instance = DagsterInstance.ephemeral()
sensors = [sensor_1, sensor_2, sensor_3]
cursors = {}
result = job_1.execute_in_process(instance=instance)
assert result.success
# the first run of a status sensor starts tracking from that point,
# so run each one and save the cursor
for sensor in sensors:
ctx = build_sensor_context(instance=instance)
data = sensor.evaluate_tick(ctx)
assert len(data.run_requests) == 0
cursors[sensor] = data.cursor
# execute the target job
result = certain_job.execute_in_process(instance=instance)
assert result.success
# evaluate all sensors
requested_jobs = set()
for sensor in sensors:
ctx = build_sensor_context(
instance=instance,
cursor=cursors[sensor],
)
data = sensor.evaluate_tick(ctx)
for request in data.run_requests:
requested_jobs.add(request.job_name)
# assert the expected response amongst sensors
assert requested_jobs == {"job_1", "job_2"}
if __name__ == "__main__":
test_sensors() |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
garethbrickman
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I want to unit test that when a certain job runs, my sensor ecosystem triggers job2 and job3 but not job4.
Beta Was this translation helpful? Give feedback.
All reactions