Source code for dagster._utils.dagster_type
from typing import Any
from dagster._core.definitions.events import Failure, TypeCheck
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.job_base import InMemoryJob
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.context_creation_job import scoped_job_context
from dagster._core.instance import DagsterInstance
from dagster._core.types.dagster_type import resolve_dagster_type
from .typing_api import is_typing_type
[docs]def check_dagster_type(dagster_type: Any, value: Any) -> TypeCheck:
    """Test a custom Dagster type.
    Args:
        dagster_type (Any): The Dagster type to test. Should be one of the
            :ref:`built-in types <builtin>`, a dagster type explicitly constructed with
            :py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type>`, or
            :py:func:`PythonObjectDagsterType`, or a Python type.
        value (Any): The runtime value to test.
    Returns:
        TypeCheck: The result of the type check.
    Examples:
        .. code-block:: python
            assert check_dagster_type(Dict[Any, Any], {'foo': 'bar'}).success
    """
    if is_typing_type(dagster_type):
        raise DagsterInvariantViolationError(
            f"Must pass in a type from dagster module. You passed {dagster_type} "
            "which is part of python's typing module."
        )
    dagster_type = resolve_dagster_type(dagster_type)
    job = InMemoryJob(GraphDefinition(node_defs=[], name="empty").to_job())
    job_def = job.get_definition()
    instance = DagsterInstance.ephemeral()
    execution_plan = create_execution_plan(job)
    dagster_run = instance.create_run_for_job(job_def)
    with scoped_job_context(execution_plan, job, {}, dagster_run, instance) as context:
        type_check_context = context.for_type(dagster_type)
        try:
            type_check = dagster_type.type_check(type_check_context, value)
        except Failure as failure:
            return TypeCheck(success=False, description=failure.description)
        if not isinstance(type_check, TypeCheck):
            raise DagsterInvariantViolationError(
                "Type checks can only return TypeCheck. Type {type_name} returned {value}.".format(
                    type_name=dagster_type.display_name, value=repr(type_check)
                )
            )
        return type_check