You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.asset_check_result

from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_check_evaluation import (
    AssetCheckEvaluation,
    AssetCheckEvaluationTargetMaterializationData,
)
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.events import (
    AssetKey,
    CoercibleToAssetKey,
    MetadataValue,
    RawMetadataValue,
    normalize_metadata,
)
from dagster._core.errors import DagsterInvariantViolationError

if TYPE_CHECKING:
    from dagster._core.execution.context.compute import StepExecutionContext


[docs]@experimental class AssetCheckResult( NamedTuple( "_AssetCheckResult", [ ("passed", PublicAttr[bool]), ("asset_key", PublicAttr[Optional[AssetKey]]), ("check_name", PublicAttr[Optional[str]]), ("metadata", PublicAttr[Mapping[str, MetadataValue]]), ("severity", PublicAttr[AssetCheckSeverity]), ], ) ): """The result of an asset check. Attributes: asset_key (Optional[AssetKey]): The asset key that was checked. check_name (Optional[str]): The name of the check. passed (bool): The pass/fail result of the check. metadata (Optional[Dict[str, RawMetadataValue]]): Arbitrary metadata about the asset. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method. severity (AssetCheckSeverity): Severity of the check. Defaults to ERROR. """ def __new__( cls, *, passed: bool, asset_key: Optional[CoercibleToAssetKey] = None, check_name: Optional[str] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, severity: AssetCheckSeverity = AssetCheckSeverity.ERROR, ): normalized_metadata = normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str), ) return super().__new__( cls, asset_key=AssetKey.from_coercible(asset_key) if asset_key is not None else None, check_name=check.opt_str_param(check_name, "check_name"), passed=check.bool_param(passed, "passed"), metadata=normalized_metadata, severity=check.inst_param(severity, "severity", AssetCheckSeverity), ) def to_asset_check_evaluation( self, step_context: "StepExecutionContext" ) -> AssetCheckEvaluation: spec_check_names_by_asset_key = ( step_context.job_def.asset_layer.get_check_names_by_asset_key_for_node_handle( step_context.node_handle.root ) ) asset_keys_with_specs = spec_check_names_by_asset_key.keys() if self.asset_key is not None: if self.asset_key not in asset_keys_with_specs: raise DagsterInvariantViolationError( "Received unexpected AssetCheckResult. It targets asset" f" '{self.asset_key.to_user_string()}' which is not targeted by any of the" " checks currently being evaluated. Targeted assets:" f" {[asset_key.to_user_string() for asset_key in asset_keys_with_specs]}." ) resolved_asset_key = self.asset_key else: if len(spec_check_names_by_asset_key) > 1: raise DagsterInvariantViolationError( "AssetCheckResult didn't specify an asset key, but there are multiple assets" " to choose from:" f" {[asset_key.to_user_string() for asset_key in spec_check_names_by_asset_key.keys()]}" ) resolved_asset_key = next(iter(asset_keys_with_specs)) check_names_with_specs = spec_check_names_by_asset_key[resolved_asset_key] if self.check_name is not None: if self.check_name not in check_names_with_specs: raise DagsterInvariantViolationError( "Received unexpected AssetCheckResult. No checks currently being evaluated" f" target asset '{resolved_asset_key.to_user_string()}' and have name" f" '{self.check_name}'. Checks being evaluated for this asset:" f" {check_names_with_specs}" ) resolved_check_name = self.check_name else: if len(check_names_with_specs) > 1: raise DagsterInvariantViolationError( "AssetCheckResult result didn't specify a check name, but there are multiple" " checks to choose from for the this asset key:" f" {check_names_with_specs}" ) resolved_check_name = next(iter(check_names_with_specs)) input_asset_info = step_context.get_input_asset_version_info(resolved_asset_key) if input_asset_info is not None: target_materialization_data = AssetCheckEvaluationTargetMaterializationData( run_id=input_asset_info.run_id, storage_id=input_asset_info.storage_id, timestamp=input_asset_info.timestamp, ) else: target_materialization_data = None return AssetCheckEvaluation( check_name=resolved_check_name, asset_key=resolved_asset_key, passed=self.passed, metadata=self.metadata, target_materialization_data=target_materialization_data, severity=self.severity, ) def get_spec_python_identifier( self, *, asset_key: Optional[AssetKey] = None, check_name: Optional[str] = None ) -> str: """Returns a string uniquely identifying the asset check spec associated with this result. This is used for the output name associated with an `AssetCheckResult`. """ asset_key = asset_key or self.asset_key check_name = check_name or self.check_name assert asset_key is not None, "Asset key must be provided if not set on spec" assert asset_key is not None, "Asset key must be provided if not set on spec" return f"{asset_key.to_python_identifier()}_{self.check_name}"