You can set up Dagster to automatically materialize assets when criteria are met. This enables a declarative approach to asset scheduling – instead of defining imperative workflows to materialize your assets, you just describe the conditions under which they should be materialized.
At a high-level, the most common way for assets to be auto-materialized is "eagerly" -- immediately after upstream changes occur, a run will be kicked off to incorporate those changes into a given asset. However, the precise rules that govern when runs are kicked off can be customized on an asset-by-asset basis.
To enable assets to be automatically materialized, you need to first flip a toggle in the Dagster UI.
If you're using an open source Dagster deployment, you can get to this toggle by clicking "Deployment" in the top navigation pane and then clicking on the "Daemons" tab.
If you're using Dagster Cloud, you can get to this toggle by clicking "Deployment" in the top navigation pane, then clicking on the "Agents" tab, then looking under "Cloud service statuses".
You can set up an asset to be auto-materialized by assigning it an AutoMaterializePolicy. Each policy consists of a set of AutoMaterializeRules, each representing individual reasons that an asset should be materialized or not at a given point in time. If there's at least one rule determining that the asset should be materialized, and no rules determining that it should be skipped, a run will be launched to materialize that asset.
It is recommended to start with the built-in AutoMaterializePolicy.eager and further customize from there if necessary. This policy consists of all of the supported rules, other than skip_on_not_all_parents_updated. The supported rules are currently:
Skip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets).
Skip materializing an asset partition if any of its parents have not been updated since the asset's last materialization.
In this example, we use AutoMaterializePolicy.eager to indicate that, any time that asset1 is materialized, asset2 should be automatically materialized right after:
from dagster import AutoMaterializePolicy, asset
@assetdefasset1():...@asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1])defasset2():...
This example assumes that asset1 will be materialized in some other way - e.g. manually, via a sensor, or via a schedule.
Adding an auto-materialize policy to multiple assets at once#
Auto-materialize policies can be customized by adding or removing rules. These changes will be reflected in the UI for individual assets.
Auto-materialize only once all parents have been updated#
By default, the eager policy will materialize an asset whenever any of its parents have been updated. In cases where an asset has many parents, this may cause more materializations than desired, as each parent update will result in an additional downstream materialization. To avoid this, the skip_on_not_all_parents_updated rule can be applied to a given policy to force it to wait until all of an asset's parents have been updated before materializing it.
Auto-materialize even if some parents are missing#
By default, the eager policy won't materialize an asset if any of its parents are missing. In some cases, it's desirable to allow the downstream asset to be materialized, even if some of its parent assets/partitions are missing. To enable this, the skip_on_parent_missing rule can be removed from a given policy to prevent this from blocking the materialization of an asset.
Each AutoMaterializeRule generally applies individually to each partition of a partitioned asset. Here's a pipeline with two daily-partitioned assets that have eager auto-materialize policies. At the end of each day, a partition for that day will be added to the set of partitions for each of the assets. Dagster will notice that the new partitions exist, but have no materializations, and then auto-materialize them.
If the last partition of asset1 is re-materialized, e.g. manually from the UI, then the corresponding partition of asset2 will be auto-materialized after.
By default, a given AutoMaterializePolicy will not allow more than one partition of an asset to be materialized per minute. Any partitions exceeding this threshold will be discarded. Manual intervention will be required to materialize the discarded partitions.
This threshold may be increased as follows:
from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
auto_materialize_policy=AutoMaterializePolicy.eager(
max_materializations_per_minute=7),)defasset1():...
For time-partitioned assets, the N most recent partitions will be selected from the set of candidates to be materialized. For other types of partitioned assets, the selection will be random.
Observable source assets are assets that your code doesn't materialize, but that you provide a function for that can tell when they've changed. The AutoMaterializeRule.materialize_on_parent_updated rule incorporates the observed data versions of these assets when determining if it should fire for a downstream asset.
In this example, we check every minute to see whether source_file was modified. If it was, then the AutoMaterializePolicy on asset1 will cause it to be materialized.
import os
from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset
@observable_source_asset(auto_observe_interval_minutes=1)defsource_file():return DataVersion(str(os.path.getmtime("source_file.csv")))@asset(
deps=[source_file],
auto_materialize_policy=AutoMaterializePolicy.eager(),)defasset1():...