Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.asset_checks

from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    Iterator,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
)

from dagster import _check as check
from dagster._annotations import experimental, public
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec
from dagster._core.definitions.events import (
    AssetKey,
    CoercibleToAssetKeyPrefix,
)
from dagster._core.definitions.node_definition import NodeDefinition
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.resource_requirement import (
    RequiresResources,
    ResourceAddable,
    ResourceRequirement,
    merge_resource_defs,
)
from dagster._core.errors import DagsterAssetCheckFailedError
from dagster._core.types.dagster_type import Nothing

if TYPE_CHECKING:
    from dagster._core.definitions.assets import AssetsDefinition


@experimental
class AssetChecksDefinitionInputOutputProps(NamedTuple):
    asset_check_keys_by_output_name: Mapping[str, AssetCheckKey]
    asset_keys_by_input_name: Mapping[str, AssetKey]

    def with_asset_key_prefix(
        self, prefix: CoercibleToAssetKeyPrefix
    ) -> "AssetChecksDefinitionInputOutputProps":
        return self._replace(
            asset_check_keys_by_output_name={
                output_name: check_key.with_asset_key_prefix(prefix)
                for output_name, check_key in self.asset_check_keys_by_output_name.items()
            },
            asset_keys_by_input_name={
                input_name: asset_key.with_prefix(prefix)
                for input_name, asset_key in self.asset_keys_by_input_name.items()
            },
        )


[docs]@experimental class AssetChecksDefinition(ResourceAddable, RequiresResources): """Defines a set of checks that are produced by the same op or op graph. AssetChecksDefinition are typically not instantiated directly, but rather produced using a decorator like :py:func:`@asset_check <asset>`. """ def __init__( self, *, node_def: NodeDefinition, resource_defs: Mapping[str, ResourceDefinition], specs: Sequence[AssetCheckSpec], input_output_props: AssetChecksDefinitionInputOutputProps, # if adding new fields, make sure to handle them in the get_attributes_dict method ): self._node_def = node_def self._resource_defs = resource_defs self._specs = check.sequence_param(specs, "specs", of_type=AssetCheckSpec) self._input_output_props = check.inst_param( input_output_props, "input_output_props", AssetChecksDefinitionInputOutputProps ) self._specs_by_handle = {spec.key: spec for spec in specs} self._specs_by_output_name = { output_name: self._specs_by_handle[check_key] for output_name, check_key in input_output_props.asset_check_keys_by_output_name.items() } @public @property def node_def(self) -> NodeDefinition: """The op or op graph that can be executed to check the assets.""" return self._node_def @public @property def name(self) -> str: return self.spec.name @public @property def description(self) -> Optional[str]: return self.spec.description @public @property def asset_key(self) -> AssetKey: return self.spec.asset_key @public @property def spec(self) -> AssetCheckSpec: if len(self._specs_by_output_name) > 1: check.failed( "Tried to retrieve single-check property from a checks definition with multiple" " checks: " + ", ".join(spec.name for spec in self._specs_by_output_name.values()), ) return next(iter(self.specs)) @public @property def specs(self) -> Iterable[AssetCheckSpec]: return self._specs_by_output_name.values() @property def keys(self) -> Iterable[AssetCheckKey]: return self._specs_by_handle.keys() @property def specs_by_output_name(self) -> Mapping[str, AssetCheckSpec]: return self._specs_by_output_name @property def asset_keys_by_input_name(self) -> Mapping[str, AssetKey]: return self._input_output_props.asset_keys_by_input_name def get_resource_requirements(self) -> Iterator[ResourceRequirement]: yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined] for source_key, resource_def in self._resource_defs.items(): yield from resource_def.get_resource_requirements(outer_context=source_key) def get_spec_for_check_key(self, asset_check_key: AssetCheckKey) -> AssetCheckSpec: return self._specs_by_handle[asset_check_key] @public @property def required_resource_keys(self) -> Set[str]: """Set[str]: The set of keys for resources that must be provided to this AssetsDefinition.""" return {requirement.key for requirement in self.get_resource_requirements()} @property def resource_defs(self) -> Mapping[str, ResourceDefinition]: return self._resource_defs def with_resources( self, resource_defs: Mapping[str, ResourceDefinition] ) -> "AssetChecksDefinition": attributes_dict = self.get_attributes_dict() attributes_dict["resource_defs"] = merge_resource_defs( old_resource_defs=self._resource_defs, resource_defs_to_merge_in=resource_defs, requires_resources=self, ) return self.__class__(**attributes_dict) def get_attributes_dict(self) -> Dict[str, Any]: return dict( node_def=self._node_def, resource_defs=self._resource_defs, specs=self._specs, input_output_props=self._input_output_props, ) def with_attributes( self, asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> "AssetChecksDefinition": attributes_dict = self.get_attributes_dict() if asset_key_prefix is not None: attributes_dict["specs"] = [ spec.with_asset_key_prefix(asset_key_prefix) for spec in self._specs ] attributes_dict["input_output_props"] = self._input_output_props.with_asset_key_prefix( asset_key_prefix ) return AssetChecksDefinition(**attributes_dict)
@experimental def build_asset_with_blocking_check( asset_def: "AssetsDefinition", checks: Sequence[AssetChecksDefinition], ) -> "AssetsDefinition": from dagster import AssetIn, In, OpExecutionContext, Output, op from dagster._core.definitions.decorators.asset_decorator import graph_asset_no_defaults from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus check_specs = [] for c in checks: check_specs.extend(c.specs) check_output_names = [c.get_python_identifier() for c in check_specs] check.invariant(len(asset_def.op.output_defs) == 1) asset_out_type = asset_def.op.output_defs[0].dagster_type @op( name=f"{asset_def.op.name}_asset_and_checks", ins={"asset_return_value": In(asset_out_type), "check_evaluations": In(Nothing)}, ) def fan_in_checks_and_asset_return_value(context: OpExecutionContext, asset_return_value: Any): # we pass the asset_return_value through and store it again so that downstream assets can load it. # This is a little silly- we only do this because this op has the asset key in its StepOutputProperties # so the output is written to the right path. We could probably get the asset_def.op to write to the # asset path (and make sure we don't override it here) to avoid the double write. yield Output(asset_return_value) for check_spec in check_specs: executions = context.instance.event_log_storage.get_asset_check_execution_history( check_key=check_spec.key, limit=1 ) check.invariant( len(executions) == 1, "Expected asset check {check_spec.name} to execute" ) execution = executions[0] check.invariant( execution.run_id == context.run_id, "Expected asset check {check_spec.name} to execute in the current run", ) if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: raise DagsterAssetCheckFailedError() # kwargs are the inputs to the asset_def.op that we are wrapping def blocking_asset(**kwargs): asset_return_value = asset_def.op.with_replaced_properties( name=f"{asset_def.op.name}_graph_asset_op" )(**kwargs) check_evaluations = [check.node_def(asset_return_value) for check in checks] return { "result": fan_in_checks_and_asset_return_value(asset_return_value, check_evaluations), **{ check_output_name: check_result for check_output_name, check_result in zip(check_output_names, check_evaluations) }, } return graph_asset_no_defaults( compose_fn=blocking_asset, name=None, key_prefix=None, key=asset_def.key, group_name=asset_def.group_names_by_key.get(asset_def.key), partitions_def=asset_def.partitions_def, check_specs=check_specs, description=asset_def.descriptions_by_key.get(asset_def.key), ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}, resource_defs=asset_def.resource_defs, metadata=asset_def.metadata_by_key.get(asset_def.key), freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, config=None, # gets config from asset_def.op )