You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.auto_materialize_rule

import datetime
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from enum import Enum
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Dict,
    FrozenSet,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    cast,
)

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.freshness_based_auto_materialize import (
    freshness_evaluation_results_for_asset_key,
)
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._serdes.serdes import (
    NamedTupleSerializer,
    UnpackContext,
    UnpackedValue,
    WhitelistMap,
    whitelist_for_serdes,
)
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

from .asset_graph import AssetGraph, sort_key_for_asset_partition
from .partition import SerializedPartitionsSubset

if TYPE_CHECKING:
    from dagster._core.definitions.asset_daemon_context import AssetDaemonContext
    from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
    from dagster._core.instance import DynamicPartitionsStore


@whitelist_for_serdes
class AutoMaterializeDecisionType(Enum):
    """Represents the set of results of the auto-materialize logic.

    MATERIALIZE: The asset should be materialized by a run kicked off on this tick
    SKIP: The asset should not be materialized by a run kicked off on this tick, because future
        ticks are expected to materialize it.
    DISCARD: The asset should not be materialized by a run kicked off on this tick, but future
        ticks are not expected to materialize it.
    """

    MATERIALIZE = "MATERIALIZE"
    SKIP = "SKIP"
    DISCARD = "DISCARD"


class AutoMaterializeRuleEvaluationData(ABC):
    pass


@whitelist_for_serdes
class TextRuleEvaluationData(
    AutoMaterializeRuleEvaluationData,
    NamedTuple("_TextRuleEvaluationData", [("text", str)]),
):
    pass


@whitelist_for_serdes
class ParentUpdatedRuleEvaluationData(
    AutoMaterializeRuleEvaluationData,
    NamedTuple(
        "_ParentUpdatedRuleEvaluationData",
        [
            ("updated_asset_keys", FrozenSet[AssetKey]),
            ("will_update_asset_keys", FrozenSet[AssetKey]),
        ],
    ),
):
    pass


@whitelist_for_serdes
class WaitingOnAssetsRuleEvaluationData(
    AutoMaterializeRuleEvaluationData,
    NamedTuple(
        "_WaitingOnParentRuleEvaluationData",
        [("waiting_on_asset_keys", FrozenSet[AssetKey])],
    ),
):
    pass


@whitelist_for_serdes
class AutoMaterializeRuleSnapshot(NamedTuple):
    """A serializable snapshot of an AutoMaterializeRule for historical evaluations."""

    class_name: str
    description: str
    decision_type: AutoMaterializeDecisionType

    @staticmethod
    def from_rule(rule: "AutoMaterializeRule") -> "AutoMaterializeRuleSnapshot":
        return AutoMaterializeRuleSnapshot(
            class_name=rule.__class__.__name__,
            description=rule.description,
            decision_type=rule.decision_type,
        )


@whitelist_for_serdes
class AutoMaterializeRuleEvaluation(NamedTuple):
    rule_snapshot: AutoMaterializeRuleSnapshot
    evaluation_data: Optional[AutoMaterializeRuleEvaluationData]


class RuleEvaluationContext(NamedTuple):
    asset_key: AssetKey
    cursor: "AssetDaemonCursor"
    instance_queryer: CachingInstanceQueryer
    data_time_resolver: CachingDataTimeResolver
    will_materialize_mapping: Mapping[AssetKey, AbstractSet[AssetKeyPartitionKey]]
    expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]]
    candidates: AbstractSet[AssetKeyPartitionKey]
    daemon_context: "AssetDaemonContext"

    @property
    def asset_graph(self) -> AssetGraph:
        return self.instance_queryer.asset_graph

    def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool:
        """Returns whether a child asset can be materialized in the same run as a parent asset."""
        from dagster._core.definitions.external_asset_graph import ExternalAssetGraph

        return (
            # both assets must be materializable
            child_key in self.asset_graph.materializable_asset_keys
            and parent_key in self.asset_graph.materializable_asset_keys
            # the parent must have the same partitioning
            and self.asset_graph.have_same_partitioning(child_key, parent_key)
            # the parent must have a simple partition mapping to the child
            and (
                not self.asset_graph.is_partitioned(parent_key)
                or isinstance(
                    self.asset_graph.get_partition_mapping(child_key, parent_key),
                    (TimeWindowPartitionMapping, IdentityPartitionMapping),
                )
            )
            # the parent must be in the same repository to be materialized alongside the candidate
            and (
                not isinstance(self.asset_graph, ExternalAssetGraph)
                or self.asset_graph.get_repository_handle(child_key)
                == self.asset_graph.get_repository_handle(parent_key)
            )
        )

    def get_parents_that_will_not_be_materialized_on_current_tick(
        self, *, asset_partition: AssetKeyPartitionKey
    ) -> AbstractSet[AssetKeyPartitionKey]:
        """Returns the set of parent asset partitions that will not be updated in the same run of
        this asset partition if we launch a run of this asset partition on this tick.
        """
        return {
            parent
            for parent in self.asset_graph.get_parents_partitions(
                dynamic_partitions_store=self.instance_queryer,
                current_time=self.instance_queryer.evaluation_time,
                asset_key=asset_partition.asset_key,
                partition_key=asset_partition.partition_key,
            ).parent_partitions
            if parent not in self.will_materialize_mapping.get(parent.asset_key, set())
            or not self.materializable_in_same_run(asset_partition.asset_key, parent.asset_key)
        }

    def get_asset_partitions_by_asset_key(
        self,
        asset_partitions: AbstractSet[AssetKeyPartitionKey],
    ) -> Mapping[AssetKey, Set[AssetKeyPartitionKey]]:
        asset_partitions_by_asset_key: Dict[AssetKey, Set[AssetKeyPartitionKey]] = defaultdict(set)
        for parent in asset_partitions:
            asset_partitions_by_asset_key[parent.asset_key].add(parent)

        return asset_partitions_by_asset_key


RuleEvaluationResults = Sequence[Tuple[Optional[AutoMaterializeRuleEvaluationData], AbstractSet]]


[docs]class AutoMaterializeRule(ABC): """An AutoMaterializeRule defines a bit of logic which helps determine if a materialization should be kicked off for a given asset partition. Each rule can have one of two decision types, `MATERIALIZE` (indicating that an asset partition should be materialized) or `SKIP` (indicating that the asset partition should not be materialized). Materialize rules are evaluated first, and skip rules operate over the set of candidates that are produced by the materialize rules. Other than that, there is no ordering between rules. """ @abstractproperty def decision_type(self) -> AutoMaterializeDecisionType: """The decision type of the rule (either `MATERIALIZE` or `SKIP`).""" ... @abstractproperty def description(self) -> str: """A human-readable description of this rule. As a basic guideline, this string should complete the sentence: 'Indicates an asset should be (materialize/skipped) when ____'. """ ... @abstractmethod def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: """The core evaluation function for the rule. This function takes in a context object and returns a mapping from evaluated rules to the set of asset partitions that the rule applies to. """ ...
[docs] @public @staticmethod def materialize_on_required_for_freshness() -> "MaterializeOnRequiredForFreshnessRule": """Materialize an asset partition if it is required to satisfy a freshness policy of this asset or one of its downstream assets. Note: This rule has no effect on partitioned assets. """ return MaterializeOnRequiredForFreshnessRule()
[docs] @public @staticmethod def materialize_on_parent_updated() -> "MaterializeOnParentUpdatedRule": """Materialize an asset partition if one of its parents has been updated more recently than it has. Note: For time-partitioned or dynamic-partitioned assets downstream of an unpartitioned asset, this rule will only fire for the most recent partition of the downstream. """ return MaterializeOnParentUpdatedRule()
[docs] @public @staticmethod def materialize_on_missing() -> "MaterializeOnMissingRule": """Materialize an asset partition if it has never been materialized before. This rule will not fire for non-root assets unless that asset's parents have been updated. """ return MaterializeOnMissingRule()
[docs] @public @staticmethod def skip_on_parent_missing() -> "SkipOnParentMissingRule": """Skip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets). """ return SkipOnParentMissingRule()
[docs] @public @staticmethod def skip_on_parent_outdated() -> "SkipOnParentOutdatedRule": """Skip materializing an asset partition if any of its parents has not incorporated the latest data from its ancestors. """ return SkipOnParentOutdatedRule()
[docs] @public @staticmethod def skip_on_not_all_parents_updated( require_update_for_all_parent_partitions: bool = False, ) -> "SkipOnNotAllParentsUpdatedRule": """Skip materializing an asset partition if any of its parents have not been updated since the asset's last materialization. Attributes: require_update_for_all_parent_partitions (Optional[bool]): Applies only to an unpartitioned asset or an asset partition that depends on more than one partition in any upstream asset. If true, requires all upstream partitions in each upstream asset to be materialized since the downstream asset's last materialization in order to update it. If false, requires at least one upstream partition in each upstream asset to be materialized since the downstream asset's last materialization in order to update it. Defaults to false. """ return SkipOnNotAllParentsUpdatedRule(require_update_for_all_parent_partitions)
def to_snapshot(self) -> AutoMaterializeRuleSnapshot: """Returns a serializable snapshot of this rule for historical evaluations.""" return AutoMaterializeRuleSnapshot.from_rule(self) def __eq__(self, other) -> bool: # override the default NamedTuple __eq__ method to factor in types return type(self) == type(other) and super().__eq__(other) def __hash__(self) -> int: # override the default NamedTuple __hash__ method to factor in types return hash(hash(type(self)) + super().__hash__())
@whitelist_for_serdes class MaterializeOnRequiredForFreshnessRule( AutoMaterializeRule, NamedTuple("_MaterializeOnRequiredForFreshnessRule", []) ): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.MATERIALIZE @property def description(self) -> str: return "required to meet this or downstream asset's freshness policy" def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: freshness_conditions = freshness_evaluation_results_for_asset_key( asset_key=context.asset_key, data_time_resolver=context.data_time_resolver, asset_graph=context.asset_graph, current_time=context.instance_queryer.evaluation_time, will_materialize_mapping=context.will_materialize_mapping, expected_data_time_mapping=context.expected_data_time_mapping, ) return freshness_conditions @whitelist_for_serdes class MaterializeOnParentUpdatedRule( AutoMaterializeRule, NamedTuple("_MaterializeOnParentUpdatedRule", []) ): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.MATERIALIZE @property def description(self) -> str: return "upstream data has changed since latest materialization" def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: """Evaluates the set of asset partitions of this asset whose parents have been updated, or will update on this tick. """ conditions = defaultdict(set) has_parents_that_will_update = set() # first, get the set of parents that will be materialized this tick, and see if we # can materialize this asset with those parents will_update_parents_by_asset_partition = defaultdict(set) for parent_key in context.asset_graph.get_parents(context.asset_key): if not context.materializable_in_same_run(context.asset_key, parent_key): continue for parent_partition in context.will_materialize_mapping.get(parent_key, set()): asset_partition = AssetKeyPartitionKey( context.asset_key, parent_partition.partition_key ) will_update_parents_by_asset_partition[asset_partition].add(parent_key) has_parents_that_will_update.add(asset_partition) # next, for each asset partition of this asset which has newly-updated parents, or # has a parent that will update, create a ParentUpdatedRuleEvaluationData has_or_will_update = ( context.daemon_context.get_asset_partitions_with_newly_updated_parents_for_key( context.asset_key ) | has_parents_that_will_update ) for asset_partition in has_or_will_update: parent_asset_partitions = context.asset_graph.get_parents_partitions( dynamic_partitions_store=context.instance_queryer, current_time=context.instance_queryer.evaluation_time, asset_key=asset_partition.asset_key, partition_key=asset_partition.partition_key, ).parent_partitions updated_parent_asset_partitions = context.instance_queryer.get_parent_asset_partitions_updated_after_child( asset_partition, parent_asset_partitions, # do a precise check for updated parents, factoring in data versions, as long as # we're within reasonable limits on the number of partitions to check respect_materialization_data_versions=context.daemon_context.respect_materialization_data_versions and len(parent_asset_partitions | has_or_will_update) < 100, # ignore self-dependencies when checking for updated parents, to avoid historical # rematerializations from causing a chain of materializations to be kicked off ignored_parent_keys={context.asset_key}, ) updated_parents = {parent.asset_key for parent in updated_parent_asset_partitions} will_update_parents = will_update_parents_by_asset_partition[asset_partition] if updated_parents or will_update_parents: conditions[ ParentUpdatedRuleEvaluationData( updated_asset_keys=frozenset(updated_parents), will_update_asset_keys=frozenset(will_update_parents), ) ].add(asset_partition) if conditions: return [(k, v) for k, v in conditions.items()] return [] @whitelist_for_serdes class MaterializeOnMissingRule(AutoMaterializeRule, NamedTuple("_MaterializeOnMissingRule", [])): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.MATERIALIZE @property def description(self) -> str: return "materialization is missing" def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: """Evaluates the set of asset partitions for this asset which are missing and were not previously discarded. Currently only applies to root asset partitions and asset partitions with updated parents. """ missing_asset_partitions = ( context.daemon_context.get_never_handled_root_asset_partitions_for_key( context.asset_key ) ) # in addition to missing root asset partitions, check any asset partitions with updated # parents to see if they're missing for ( candidate ) in context.daemon_context.get_asset_partitions_with_newly_updated_parents_for_key( context.asset_key ): if not context.instance_queryer.asset_partition_has_materialization_or_observation( candidate ): missing_asset_partitions |= {candidate} if missing_asset_partitions: return [(None, missing_asset_partitions)] return [] @whitelist_for_serdes class SkipOnParentOutdatedRule(AutoMaterializeRule, NamedTuple("_SkipOnParentOutdatedRule", [])): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.SKIP @property def description(self) -> str: return "waiting on upstream data to be up to date" def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: asset_partitions_by_waiting_on_asset_keys = defaultdict(set) for candidate in context.candidates: outdated_ancestors = set() # find the root cause of why this asset partition's parents are outdated (if any) for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): outdated_ancestors.update( context.instance_queryer.get_outdated_ancestors(asset_partition=parent) ) if outdated_ancestors: asset_partitions_by_waiting_on_asset_keys[frozenset(outdated_ancestors)].add( candidate ) if asset_partitions_by_waiting_on_asset_keys: return [ (WaitingOnAssetsRuleEvaluationData(waiting_on_asset_keys=k), v) for k, v in asset_partitions_by_waiting_on_asset_keys.items() ] return [] @whitelist_for_serdes class SkipOnParentMissingRule(AutoMaterializeRule, NamedTuple("_SkipOnParentMissingRule", [])): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.SKIP @property def description(self) -> str: return "waiting on upstream data to be present" def evaluate_for_asset( self, context: RuleEvaluationContext, ) -> RuleEvaluationResults: asset_partitions_by_waiting_on_asset_keys = defaultdict(set) for candidate in context.candidates: missing_parent_asset_keys = set() for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): # ignore non-observable sources, which will never have a materialization or observation if context.asset_graph.is_source( parent.asset_key ) and not context.asset_graph.is_observable(parent.asset_key): continue if not context.instance_queryer.asset_partition_has_materialization_or_observation( parent ): missing_parent_asset_keys.add(parent.asset_key) if missing_parent_asset_keys: asset_partitions_by_waiting_on_asset_keys[frozenset(missing_parent_asset_keys)].add( candidate ) if asset_partitions_by_waiting_on_asset_keys: return [ (WaitingOnAssetsRuleEvaluationData(waiting_on_asset_keys=k), v) for k, v in asset_partitions_by_waiting_on_asset_keys.items() ] return [] @whitelist_for_serdes class SkipOnNotAllParentsUpdatedRule( AutoMaterializeRule, NamedTuple( "_SkipOnNotAllParentsUpdatedRule", [("require_update_for_all_parent_partitions", bool)] ), ): """An auto-materialize rule that enforces that an asset can only be materialized if all parents have been materialized since the asset's last materialization. Attributes: require_update_for_all_parent_partitions (Optional[bool]): Applies only to an unpartitioned asset or an asset partition that depends on more than one partition in any upstream asset. If true, requires all upstream partitions in each upstream asset to be materialized since the downstream asset's last materialization in order to update it. If false, requires at least one upstream partition in each upstream asset to be materialized since the downstream asset's last materialization in order to update it. Defaults to false. """ @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.SKIP @property def description(self) -> str: if self.require_update_for_all_parent_partitions is False: return "waiting on upstream data to be updated" else: return "waiting until all upstream partitions are updated" def evaluate_for_asset( self, context: RuleEvaluationContext, ) -> RuleEvaluationResults: asset_partitions_by_waiting_on_asset_keys = defaultdict(set) for candidate in context.candidates: parent_partitions = context.asset_graph.get_parents_partitions( context.instance_queryer, context.instance_queryer.evaluation_time, context.asset_key, candidate.partition_key, ).parent_partitions updated_parent_partitions = ( context.instance_queryer.get_parent_asset_partitions_updated_after_child( candidate, parent_partitions, context.daemon_context.respect_materialization_data_versions, ignored_parent_keys=set(), ) | set().union( *[ context.will_materialize_mapping.get(parent, set()) for parent in context.asset_graph.get_parents(context.asset_key) ] ) ) if self.require_update_for_all_parent_partitions: # All upstream partitions must be updated in order for the candidate to be updated non_updated_parent_keys = { parent.asset_key for parent in parent_partitions - updated_parent_partitions } else: # At least one upstream partition in each upstream asset must be updated in order # for the candidate to be updated parent_asset_keys = context.asset_graph.get_parents(context.asset_key) updated_parent_partitions_by_asset_key = context.get_asset_partitions_by_asset_key( updated_parent_partitions ) non_updated_parent_keys = { parent for parent in parent_asset_keys if not updated_parent_partitions_by_asset_key.get(parent) } # do not require past partitions of this asset to be updated non_updated_parent_keys -= {context.asset_key} if non_updated_parent_keys: asset_partitions_by_waiting_on_asset_keys[frozenset(non_updated_parent_keys)].add( candidate ) if asset_partitions_by_waiting_on_asset_keys: return [ (WaitingOnAssetsRuleEvaluationData(waiting_on_asset_keys=k), v) for k, v in asset_partitions_by_waiting_on_asset_keys.items() ] return [] @whitelist_for_serdes class DiscardOnMaxMaterializationsExceededRule( AutoMaterializeRule, NamedTuple("_DiscardOnMaxMaterializationsExceededRule", [("limit", int)]) ): @property def decision_type(self) -> AutoMaterializeDecisionType: return AutoMaterializeDecisionType.DISCARD @property def description(self) -> str: return f"exceeds {self.limit} materialization(s) per minute" def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: # the set of asset partitions which exceed the limit rate_limited_asset_partitions = set( sorted( context.candidates, key=lambda x: sort_key_for_asset_partition(context.asset_graph, x), )[self.limit :] ) if rate_limited_asset_partitions: return [(None, rate_limited_asset_partitions)] return [] @whitelist_for_serdes class AutoMaterializeAssetEvaluation(NamedTuple): """Represents the results of the auto-materialize logic for a single asset. Properties: asset_key (AssetKey): The asset key that was evaluated. partition_subsets_by_condition: The rule evaluations that impact if the asset should be materialized, skipped, or discarded. If the asset is partitioned, this will be a list of tuples, where the first element is the condition and the second element is the serialized subset of partitions that the condition applies to. If it's not partitioned, the second element will be None. """ asset_key: AssetKey partition_subsets_by_condition: Sequence[ Tuple["AutoMaterializeRuleEvaluation", Optional[SerializedPartitionsSubset]] ] num_requested: int num_skipped: int num_discarded: int run_ids: Set[str] = set() rule_snapshots: Optional[Sequence[AutoMaterializeRuleSnapshot]] = None @staticmethod def from_rule_evaluation_results( asset_graph: AssetGraph, asset_key: AssetKey, asset_partitions_by_rule_evaluation: Sequence[ Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]] ], num_requested: int, num_skipped: int, num_discarded: int, dynamic_partitions_store: "DynamicPartitionsStore", ) -> "AutoMaterializeAssetEvaluation": auto_materialize_policy = asset_graph.auto_materialize_policies_by_key.get(asset_key) if not auto_materialize_policy: check.failed(f"Expected auto materialize policy on asset {asset_key}") partitions_def = asset_graph.get_partitions_def(asset_key) if partitions_def is None: return AutoMaterializeAssetEvaluation( asset_key=asset_key, partition_subsets_by_condition=[ (rule_evaluation, None) for rule_evaluation, _ in asset_partitions_by_rule_evaluation ], num_requested=num_requested, num_skipped=num_skipped, num_discarded=num_discarded, rule_snapshots=auto_materialize_policy.rule_snapshots, ) else: return AutoMaterializeAssetEvaluation( asset_key=asset_key, partition_subsets_by_condition=[ ( rule_evaluation, SerializedPartitionsSubset.from_subset( subset=partitions_def.empty_subset().with_partition_keys( check.not_none(ap.partition_key) for ap in asset_partitions ), partitions_def=partitions_def, dynamic_partitions_store=dynamic_partitions_store, ), ) for rule_evaluation, asset_partitions in asset_partitions_by_rule_evaluation ], num_requested=num_requested, num_skipped=num_skipped, num_discarded=num_discarded, rule_snapshots=auto_materialize_policy.rule_snapshots, ) # BACKCOMPAT GRAVEYARD class BackcompatAutoMaterializeConditionSerializer(NamedTupleSerializer): """This handles backcompat for the old AutoMaterializeCondition objects, turning them into the proper AutoMaterializeRuleEvaluation objects. This is necessary because old AutoMaterializeAssetEvaluation objects will have serialized AutoMaterializeCondition objects, and we need to be able to deserialize them. In theory, as these serialized objects happen to be purged periodically, we can remove this backcompat logic at some point in the future. """ def unpack( self, unpacked_dict: Dict[str, UnpackedValue], whitelist_map: WhitelistMap, context: UnpackContext, ) -> AutoMaterializeRuleEvaluation: if self.klass in ( FreshnessAutoMaterializeCondition, DownstreamFreshnessAutoMaterializeCondition, ): return AutoMaterializeRuleEvaluation( rule_snapshot=AutoMaterializeRule.materialize_on_required_for_freshness().to_snapshot(), evaluation_data=None, ) elif self.klass == MissingAutoMaterializeCondition: return AutoMaterializeRuleEvaluation( rule_snapshot=AutoMaterializeRule.materialize_on_missing().to_snapshot(), evaluation_data=None, ) elif self.klass == ParentMaterializedAutoMaterializeCondition: updated_asset_keys = unpacked_dict.get("updated_asset_keys") if isinstance(updated_asset_keys, set): updated_asset_keys = cast(FrozenSet[AssetKey], frozenset(updated_asset_keys)) else: updated_asset_keys = frozenset() will_update_asset_keys = unpacked_dict.get("will_update_asset_keys") if isinstance(will_update_asset_keys, set): will_update_asset_keys = cast( FrozenSet[AssetKey], frozenset(will_update_asset_keys) ) else: will_update_asset_keys = frozenset() return AutoMaterializeRuleEvaluation( rule_snapshot=AutoMaterializeRule.materialize_on_parent_updated().to_snapshot(), evaluation_data=ParentUpdatedRuleEvaluationData( updated_asset_keys=updated_asset_keys, will_update_asset_keys=will_update_asset_keys, ), ) elif self.klass == ParentOutdatedAutoMaterializeCondition: waiting_on_asset_keys = unpacked_dict.get("waiting_on_asset_keys") if isinstance(waiting_on_asset_keys, set): waiting_on_asset_keys = cast(FrozenSet[AssetKey], frozenset(waiting_on_asset_keys)) else: waiting_on_asset_keys = frozenset() return AutoMaterializeRuleEvaluation( rule_snapshot=AutoMaterializeRule.skip_on_parent_outdated().to_snapshot(), evaluation_data=WaitingOnAssetsRuleEvaluationData( waiting_on_asset_keys=waiting_on_asset_keys ), ) elif self.klass == MaxMaterializationsExceededAutoMaterializeCondition: return AutoMaterializeRuleEvaluation( rule_snapshot=DiscardOnMaxMaterializationsExceededRule(limit=1).to_snapshot(), evaluation_data=None, ) check.failed(f"Unexpected class {self.klass}") @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class FreshnessAutoMaterializeCondition(NamedTuple): ... @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class DownstreamFreshnessAutoMaterializeCondition(NamedTuple): ... @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class ParentMaterializedAutoMaterializeCondition(NamedTuple): ... @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class MissingAutoMaterializeCondition(NamedTuple): ... @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class ParentOutdatedAutoMaterializeCondition(NamedTuple): ... @whitelist_for_serdes(serializer=BackcompatAutoMaterializeConditionSerializer) class MaxMaterializationsExceededAutoMaterializeCondition(NamedTuple): ...