Skip to main content

Customizing on_cron

Ignoring dependencies

By default, AutomationCondition.on_cron() will wait for all upstream dependencies to be updated before executing the asset it's attached to. In some cases, it can be useful to ignore some upstream dependencies in this calculation. This can be done by passing in an AssetSelection to be ignored:

import dagster as dg

condition = dg.AutomationCondition.on_cron("@hourly").ignore(
dg.AssetSelection.assets("foo")
)

Alternatively, you can pass in an AssetSelection to be allowed:

import dagster as dg

condition = dg.AutomationCondition.on_cron("@hourly").allow(
dg.AssetSelection.groups("abc")
)

Waiting for all blocking asset checks to complete before executing

The AutomationCondition.all_deps_blocking_checks_passed() condition becomes true after all upstream blocking checks have passed.

This can be combined with AutomationCondition.on_cron() to ensure that your asset does not execute if upstream data is failing data quality checks:

import dagster as dg

condition = (
dg.AutomationCondition.on_cron("@hourly")
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)

Executing later than upstream assets

By default, a single cron schedule determines the point in time that an asset starts looking for upstream data, as well as the earliest point that it would be valid to execute that asset. Sometimes, it can be useful to start looking for upstream updates at an earlier time than the cron schedule on which you want the asset to execute.

This can be achieved by modifying the AutomationCondition.all_deps_updated_since_cron() sub-condition. In this example, we want our asset to materialize at 9:00 AM each day, but start looking for upstream data as soon as the midnight boundary is passed:

import dagster as dg

NINE_AM_CRON = "0 9 * * *"

condition = dg.AutomationCondition.on_cron(NINE_AM_CRON).replace(
old=dg.AutomationCondition.all_deps_updated_since_cron(NINE_AM_CRON),
new=dg.AutomationCondition.all_deps_updated_since_cron("0 0 * * *"),
)

Updating older time partitions

By default, AutomationCondition.on_cron() will target the latest time partition of an asset.

If you instead want to update partitions on a delay, then you can replace this condition with one that targets a partition that has a specific lag from the latest time window:

from datetime import timedelta

from dagster import AutomationCondition

five_days_ago_condition = AutomationCondition.in_latest_time_window(
timedelta(days=5)
) & ~AutomationCondition.in_latest_time_window(timedelta(days=4))

condition = AutomationCondition.on_cron("@daily").replace(
"in_latest_time_window", five_days_ago_condition
)