from typing import Callable, Dict, Mapping, NamedTuple, Optional, Set, cast
import pendulum
import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidInvocationError,
    FreshnessPolicySensorExecutionError,
    user_code_error_boundary,
)
from dagster._core.instance import DagsterInstance
from dagster._serdes import (
    serialize_value,
    whitelist_for_serdes,
)
from dagster._serdes.errors import DeserializationError
from dagster._serdes.serdes import deserialize_value
from dagster._seven import JSONDecodeError
from .sensor_definition import (
    DefaultSensorStatus,
    RawSensorEvaluationFunctionReturn,
    SensorDefinition,
    SensorEvaluationContext,
    SensorType,
    SkipReason,
    get_context_param_name,
    get_sensor_context_from_args_or_kwargs,
    validate_and_get_resource_dict,
)
@whitelist_for_serdes
class FreshnessPolicySensorCursor(
    NamedTuple(
        "_FreshnessPolicySensorCursor",
        [("minutes_late_by_key_str", Mapping[str, Optional[float]])],
    )
):
    def __new__(cls, minutes_late_by_key_str: Mapping[str, Optional[float]]):
        return super(FreshnessPolicySensorCursor, cls).__new__(
            cls,
            minutes_late_by_key_str=check.mapping_param(
                minutes_late_by_key_str, "minutes_late_by_key_str", key_type=str
            ),
        )
    @staticmethod
    def is_valid(json_str: str) -> bool:
        try:
            deserialize_value(json_str, FreshnessPolicySensorCursor)
            return True
        except (JSONDecodeError, DeserializationError):
            return False
    @staticmethod
    def from_dict(
        minutes_late_by_key: Mapping[AssetKey, Optional[float]],
    ) -> "FreshnessPolicySensorCursor":
        return FreshnessPolicySensorCursor(
            minutes_late_by_key_str={k.to_user_string(): v for k, v in minutes_late_by_key.items()}
        )
    @property
    def minutes_late_by_key(self) -> Mapping[AssetKey, Optional[float]]:
        return {AssetKey.from_user_string(k): v for k, v in self.minutes_late_by_key_str.items()}
    def to_json(self) -> str:
        return serialize_value(cast(NamedTuple, self))
    @staticmethod
    def from_json(json_str: str) -> "FreshnessPolicySensorCursor":
        return deserialize_value(json_str, FreshnessPolicySensorCursor)
[docs]class FreshnessPolicySensorContext(
    NamedTuple(
        "_FreshnessPolicySensorContext",
        [
            ("sensor_name", PublicAttr[str]),
            ("asset_key", PublicAttr[AssetKey]),
            ("freshness_policy", PublicAttr[FreshnessPolicy]),
            ("minutes_overdue", PublicAttr[Optional[float]]),
            ("previous_minutes_overdue", PublicAttr[Optional[float]]),
            ("instance", PublicAttr[DagsterInstance]),
            ("resources", Resources),
        ],
    )
):
    """The ``context`` object available to a decorated function of ``freshness_policy_sensor``.
    Attributes:
        sensor_name (str): the name of the sensor.
        asset_key (AssetKey): the key of the asset being monitored
        freshness_policy (FreshnessPolicy): the freshness policy of the asset being monitored
        minutes_overdue (Optional[float])
        previous_minutes_overdue (Optional[float]): the minutes_overdue value for this asset on the
            previous sensor tick.
        instance (DagsterInstance): the current instance.
    """
    def __new__(
        cls,
        sensor_name: str,
        asset_key: AssetKey,
        freshness_policy: FreshnessPolicy,
        minutes_overdue: Optional[float],
        previous_minutes_overdue: Optional[float],
        instance: DagsterInstance,
        resources: Optional[Resources] = None,
    ):
        minutes_overdue = check.opt_numeric_param(minutes_overdue, "minutes_overdue")
        previous_minutes_overdue = check.opt_numeric_param(
            previous_minutes_overdue, "previous_minutes_overdue"
        )
        return super(FreshnessPolicySensorContext, cls).__new__(
            cls,
            sensor_name=check.str_param(sensor_name, "sensor_name"),
            asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
            freshness_policy=check.inst_param(freshness_policy, "FreshnessPolicy", FreshnessPolicy),
            minutes_overdue=float(minutes_overdue) if minutes_overdue is not None else None,
            previous_minutes_overdue=(
                float(previous_minutes_overdue) if previous_minutes_overdue is not None else None
            ),
            instance=check.inst_param(instance, "instance", DagsterInstance),
            resources=resources or ScopedResourcesBuilder.build_empty(),
        ) 
[docs]@experimental
def build_freshness_policy_sensor_context(
    sensor_name: str,
    asset_key: AssetKey,
    freshness_policy: FreshnessPolicy,
    minutes_overdue: Optional[float],
    previous_minutes_overdue: Optional[float] = None,
    instance: Optional[DagsterInstance] = None,
    resources: Optional[Resources] = None,
) -> FreshnessPolicySensorContext:
    """Builds freshness policy sensor context from provided parameters.
    This function can be used to provide the context argument when directly invoking a function
    decorated with `@freshness_policy_sensor`, such as when writing unit tests.
    Args:
        sensor_name (str): The name of the sensor the context is being constructed for.
        asset_key (AssetKey): The AssetKey for the monitored asset
        freshness_policy (FreshnessPolicy): The FreshnessPolicy for the monitored asset
        minutes_overdue (Optional[float]): How overdue the monitored asset currently is
        previous_minutes_overdue (Optional[float]): How overdue the monitored asset was on the
            previous tick.
        instance (DagsterInstance): The dagster instance configured for the context.
    Examples:
        .. code-block:: python
            context = build_freshness_policy_sensor_context(
                sensor_name="freshness_policy_sensor_to_invoke",
                asset_key=AssetKey("some_asset"),
                freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)<
                minutes_overdue=10.0,
            )
            freshness_policy_sensor_to_invoke(context)
    """
    return FreshnessPolicySensorContext(
        sensor_name=sensor_name,
        asset_key=asset_key,
        freshness_policy=freshness_policy,
        minutes_overdue=minutes_overdue,
        previous_minutes_overdue=previous_minutes_overdue,
        instance=instance or DagsterInstance.ephemeral(),
        resources=resources,
    ) 
[docs]class FreshnessPolicySensorDefinition(SensorDefinition):
    """Define a sensor that reacts to the status of a given set of asset freshness policies,
    where the decorated function will be evaluated on every sensor tick.
    Args:
        name (str): The name of the sensor. Defaults to the name of the decorated function.
        freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core
            evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
        asset_selection (AssetSelection): The asset selection monitored by the sensor.
        minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
            between sensor evaluations.
        description (Optional[str]): A human-readable description of the sensor.
        default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
            status can be overridden from the Dagster UI or via the GraphQL API.
    """
    def __init__(
        self,
        name: str,
        asset_selection: AssetSelection,
        freshness_policy_sensor_fn: Callable[..., RawSensorEvaluationFunctionReturn],
        minimum_interval_seconds: Optional[int] = None,
        description: Optional[str] = None,
        default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
        required_resource_keys: Optional[Set[str]] = None,
    ):
        check.str_param(name, "name")
        check.inst_param(asset_selection, "asset_selection", AssetSelection)
        check.opt_int_param(minimum_interval_seconds, "minimum_interval_seconds")
        check.opt_str_param(description, "description")
        check.inst_param(default_status, "default_status", DefaultSensorStatus)
        self._freshness_policy_sensor_fn = check.callable_param(
            freshness_policy_sensor_fn, "freshness_policy_sensor_fn"
        )
        resource_arg_names: Set[str] = {
            arg.name for arg in get_resource_args(freshness_policy_sensor_fn)
        }
        combined_required_resource_keys = (
            check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
            | resource_arg_names
        )
        def _wrapped_fn(context: SensorEvaluationContext):
            from dagster._utils.caching_instance_queryer import (
                CachingInstanceQueryer,  # expensive import
            )
            if context.repository_def is None:
                raise DagsterInvalidInvocationError(
                    "The `repository_def` property on the `SensorEvaluationContext` passed into a "
                    "`FreshnessPolicySensorDefinition` must not be None."
                )
            if context.cursor is None or not FreshnessPolicySensorCursor.is_valid(context.cursor):
                new_cursor = FreshnessPolicySensorCursor({})
                context.update_cursor(new_cursor.to_json())
                yield SkipReason(f"Initializing {name}.")
                return
            evaluation_time = pendulum.now("UTC")
            asset_graph = context.repository_def.asset_graph
            instance_queryer = CachingInstanceQueryer(
                context.instance, asset_graph, evaluation_time
            )
            data_time_resolver = CachingDataTimeResolver(instance_queryer=instance_queryer)
            monitored_keys = asset_selection.resolve(asset_graph)
            # get the previous status from the cursor
            previous_minutes_late_by_key = FreshnessPolicySensorCursor.from_json(
                context.cursor
            ).minutes_late_by_key
            minutes_late_by_key: Dict[AssetKey, Optional[float]] = {}
            for asset_key in monitored_keys:
                freshness_policy = asset_graph.get(asset_key).freshness_policy
                if freshness_policy is None:
                    continue
                # get the current minutes_overdue value for this asset
                result = data_time_resolver.get_minutes_overdue(
                    evaluation_time=evaluation_time,
                    asset_key=asset_key,
                )
                minutes_late_by_key[asset_key] = result.overdue_minutes if result else None
                resource_args_populated = validate_and_get_resource_dict(
                    context.resources, name, resource_arg_names
                )
                context_param_name = get_context_param_name(freshness_policy_sensor_fn)
                freshness_context = FreshnessPolicySensorContext(
                    sensor_name=name,
                    asset_key=asset_key,
                    freshness_policy=freshness_policy,
                    minutes_overdue=minutes_late_by_key[asset_key],
                    previous_minutes_overdue=previous_minutes_late_by_key.get(asset_key),
                    instance=context.instance,
                    resources=context.resources,
                )
                with user_code_error_boundary(
                    FreshnessPolicySensorExecutionError,
                    lambda: f'Error occurred during the execution of sensor "{name}".',
                ):
                    context_param = (
                        {context_param_name: freshness_context} if context_param_name else {}
                    )
                    result = freshness_policy_sensor_fn(
                        **context_param,
                        **resource_args_populated,
                    )
                if result is not None:
                    raise DagsterInvalidDefinitionError(
                        "Functions decorated by `@freshness_policy_sensor` may not return or yield"
                        " a value."
                    )
            context.update_cursor(
                FreshnessPolicySensorCursor.from_dict(minutes_late_by_key).to_json()
            )
        super(FreshnessPolicySensorDefinition, self).__init__(
            name=name,
            evaluation_fn=_wrapped_fn,
            minimum_interval_seconds=minimum_interval_seconds,
            description=description,
            default_status=default_status,
            required_resource_keys=combined_required_resource_keys,
        )
    def __call__(self, *args, **kwargs) -> RawSensorEvaluationFunctionReturn:
        context_param_name = get_context_param_name(self._freshness_policy_sensor_fn)
        sensor_context = get_sensor_context_from_args_or_kwargs(
            self._freshness_policy_sensor_fn,
            args,
            kwargs,
            context_type=FreshnessPolicySensorContext,
        )
        context_param = (
            {context_param_name: sensor_context} if context_param_name and sensor_context else {}
        )
        resources = validate_and_get_resource_dict(
            sensor_context.resources if sensor_context else ScopedResourcesBuilder.build_empty(),
            self._name,
            self._required_resource_keys,
        )
        return self._freshness_policy_sensor_fn(**context_param, **resources)
    @property
    def sensor_type(self) -> SensorType:
        return SensorType.FRESHNESS_POLICY 
[docs]@experimental
def freshness_policy_sensor(
    asset_selection: AssetSelection,
    *,
    name: Optional[str] = None,
    minimum_interval_seconds: Optional[int] = None,
    description: Optional[str] = None,
    default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
) -> Callable[
    [Callable[..., RawSensorEvaluationFunctionReturn]],
    FreshnessPolicySensorDefinition,
]:
    """Define a sensor that reacts to the status of a given set of asset freshness policies, where the
    decorated function will be evaluated on every tick for each asset in the selection that has a
    FreshnessPolicy defined.
    Note: returning or yielding a value from the annotated function will result in an error.
    Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
    Args:
        asset_selection (AssetSelection): The asset selection monitored by the sensor.
        name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function.
        freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core
            evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
        minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
            between sensor evaluations.
        description (Optional[str]): A human-readable description of the sensor.
        default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
            status can be overridden from the Dagster UI or via the GraphQL API.
    """
    def inner(
        fn: Callable[..., RawSensorEvaluationFunctionReturn],
    ) -> FreshnessPolicySensorDefinition:
        check.callable_param(fn, "fn")
        sensor_name = name or fn.__name__
        return FreshnessPolicySensorDefinition(
            name=sensor_name,
            freshness_policy_sensor_fn=fn,
            asset_selection=asset_selection,
            minimum_interval_seconds=minimum_interval_seconds,
            description=description,
            default_status=default_status,
        )
    return inner