import time
from typing import Any, Dict, List, Optional
import kubernetes.config
import kubernetes.watch
from dagster import (
    Enum as DagsterEnum,
    Field,
    In,
    Noneable,
    Nothing,
    OpExecutionContext,
    Permissive,
    StringSource,
    op,
)
from dagster._annotations import experimental
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._utils.merger import merge_dicts
from ..client import DEFAULT_JOB_POD_COUNT, DagsterKubernetesClient
from ..container_context import K8sContainerContext
from ..job import (
    DagsterK8sJobConfig,
    K8sConfigMergeBehavior,
    UserDefinedDagsterK8sConfig,
    construct_dagster_k8s_job,
    get_k8s_job_name,
)
from ..launcher import K8sRunLauncher
K8S_JOB_OP_CONFIG = merge_dicts(
    DagsterK8sJobConfig.config_type_container(),
    {
        "image": Field(
            StringSource,
            is_required=True,
            description="The image in which to launch the k8s job.",
        ),
        "command": Field(
            [str],
            is_required=False,
            description="The command to run in the container within the launched k8s job.",
        ),
        "args": Field(
            [str],
            is_required=False,
            description="The args for the command for the container.",
        ),
        "namespace": Field(StringSource, is_required=False),
        "load_incluster_config": Field(
            bool,
            is_required=False,
            default_value=True,
            description="""Set this value if you are running the launcher
            within a k8s cluster. If ``True``, we assume the launcher is running within the target
            cluster and load config using ``kubernetes.config.load_incluster_config``. Otherwise,
            we will use the k8s config specified in ``kubeconfig_file`` (using
            ``kubernetes.config.load_kube_config``) or fall back to the default kubeconfig.""",
        ),
        "kubeconfig_file": Field(
            Noneable(str),
            is_required=False,
            default_value=None,
            description=(
                "The kubeconfig file from which to load config. Defaults to using the default"
                " kubeconfig."
            ),
        ),
        "timeout": Field(
            int,
            is_required=False,
            description="How long to wait for the job to succeed before raising an exception",
        ),
        "container_config": Field(
            Permissive(),
            is_required=False,
            description=(
                "Raw k8s config for the k8s pod's main container"
                " (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#container-v1-core)."
                " Keys can either snake_case or camelCase."
            ),
        ),
        "pod_template_spec_metadata": Field(
            Permissive(),
            is_required=False,
            description=(
                "Raw k8s config for the k8s pod's metadata"
                " (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta)."
                " Keys can either snake_case or camelCase."
            ),
        ),
        "pod_spec_config": Field(
            Permissive(),
            is_required=False,
            description=(
                "Raw k8s config for the k8s pod's pod spec"
                " (https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#PodSpec)."
                " Keys can either snake_case or camelCase."
            ),
        ),
        "job_metadata": Field(
            Permissive(),
            is_required=False,
            description=(
                "Raw k8s config for the k8s job's metadata"
                " (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta)."
                " Keys can either snake_case or camelCase."
            ),
        ),
        "job_spec_config": Field(
            Permissive(),
            is_required=False,
            description=(
                "Raw k8s config for the k8s job's job spec"
                " (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#jobspec-v1-batch)."
                " Keys can either snake_case or camelCase."
            ),
        ),
        "merge_behavior": Field(
            DagsterEnum.from_python_enum(K8sConfigMergeBehavior),
            is_required=False,
            default_value=K8sConfigMergeBehavior.SHALLOW.value,
            description=(
                "How raw k8s config set on this op should be merged with any raw k8s config set on"
                " the code location that launched the op. By default, the value is SHALLOW, meaning"
                " that the two dictionaries are shallowly merged - any shared values in the "
                " dictionaries will be replaced by the values set on this op. Setting it to DEEP"
                " will recursively merge the two dictionaries, appending list fields together and"
                " merging dictionary fields."
            ),
        ),
    },
)
[docs]@experimental
def execute_k8s_job(
    context: OpExecutionContext,
    image: str,
    command: Optional[List[str]] = None,
    args: Optional[List[str]] = None,
    namespace: Optional[str] = None,
    image_pull_policy: Optional[str] = None,
    image_pull_secrets: Optional[List[Dict[str, str]]] = None,
    service_account_name: Optional[str] = None,
    env_config_maps: Optional[List[str]] = None,
    env_secrets: Optional[List[str]] = None,
    env_vars: Optional[List[str]] = None,
    volume_mounts: Optional[List[Dict[str, Any]]] = None,
    volumes: Optional[List[Dict[str, Any]]] = None,
    labels: Optional[Dict[str, str]] = None,
    resources: Optional[Dict[str, Any]] = None,
    scheduler_name: Optional[str] = None,
    load_incluster_config: bool = True,
    kubeconfig_file: Optional[str] = None,
    timeout: Optional[int] = None,
    container_config: Optional[Dict[str, Any]] = None,
    pod_template_spec_metadata: Optional[Dict[str, Any]] = None,
    pod_spec_config: Optional[Dict[str, Any]] = None,
    job_metadata: Optional[Dict[str, Any]] = None,
    job_spec_config: Optional[Dict[str, Any]] = None,
    k8s_job_name: Optional[str] = None,
    merge_behavior: K8sConfigMergeBehavior = K8sConfigMergeBehavior.SHALLOW,
):
    """This function is a utility for executing a Kubernetes job from within a Dagster op.
    Args:
        image (str): The image in which to launch the k8s job.
        command (Optional[List[str]]): The command to run in the container within the launched
            k8s job. Default: None.
        args (Optional[List[str]]): The args for the command for the container. Default: None.
        namespace (Optional[str]): Override the kubernetes namespace in which to run the k8s job.
            Default: None.
        image_pull_policy (Optional[str]): Allows the image pull policy to be overridden, e.g. to
            facilitate local testing with `kind <https://kind.sigs.k8s.io/>`_. Default:
            ``"Always"``. See:
            https://kubernetes.io/docs/concepts/containers/images/#updating-images.
        image_pull_secrets (Optional[List[Dict[str, str]]]): Optionally, a list of dicts, each of
            which corresponds to a Kubernetes ``LocalObjectReference`` (e.g.,
            ``{'name': 'myRegistryName'}``). This allows you to specify the ```imagePullSecrets`` on
            a pod basis. Typically, these will be provided through the service account, when needed,
            and you will not need to pass this argument. See:
            https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
            and https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core
        service_account_name (Optional[str]): The name of the Kubernetes service account under which
            to run the Job. Defaults to "default"        env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to
            draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See:
            https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container
        env_secrets (Optional[List[str]]): A list of custom Secret names from which to
            draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See:
            https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables
        env_vars (Optional[List[str]]): A list of environment variables to inject into the Job.
            Default: ``[]``. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables
        volume_mounts (Optional[List[Permissive]]): A list of volume mounts to include in the job's
            container. Default: ``[]``. See:
            https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volumemount-v1-core
        volumes (Optional[List[Permissive]]): A list of volumes to include in the Job's Pod. Default: ``[]``. See:
            https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core
        labels (Optional[Dict[str, str]]): Additional labels that should be included in the Job's Pod. See:
            https://kubernetes.io/docs/concepts/overview/working-with-objects/labels
        resources (Optional[Dict[str, Any]]) Compute resource requirements for the container. See:
            https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
        scheduler_name (Optional[str]): Use a custom Kubernetes scheduler for launched Pods. See:
            https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/
        load_incluster_config (bool): Whether the op is running within a k8s cluster. If ``True``,
            we assume the launcher is running within the target cluster and load config using
            ``kubernetes.config.load_incluster_config``. Otherwise, we will use the k8s config
            specified in ``kubeconfig_file`` (using ``kubernetes.config.load_kube_config``) or fall
            back to the default kubeconfig. Default: True,
        kubeconfig_file (Optional[str]): The kubeconfig file from which to load config. Defaults to
            using the default kubeconfig. Default: None.
        timeout (Optional[int]): Raise an exception if the op takes longer than this timeout in
            seconds to execute. Default: None.
        container_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's main container
            (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#container-v1-core).
            Keys can either snake_case or camelCase.Default: None.
        pod_template_spec_metadata (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's
            metadata (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta).
            Keys can either snake_case or camelCase. Default: None.
        pod_spec_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s pod's pod spec
            (https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#PodSpec).
            Keys can either snake_case or camelCase. Default: None.
        job_metadata (Optional[Dict[str, Any]]): Raw k8s config for the k8s job's metadata
            (https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta).
            Keys can either snake_case or camelCase. Default: None.
        job_spec_config (Optional[Dict[str, Any]]): Raw k8s config for the k8s job's job spec
            (https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#jobspec-v1-batch).
            Keys can either snake_case or camelCase.Default: None.
        k8s_job_name (Optional[str]): Overrides the name of the the k8s job. If not set, will be set
            to a unique name based on the current run ID and the name of the calling op. If set,
            make sure that the passed in name is a valid Kubernetes job name that does not
            already exist in the cluster.
        merge_behavior (Optional[K8sConfigMergeBehavior]): How raw k8s config set on this op should
            be merged with any raw k8s config set on the code location that launched the op. By
            default, the value is K8sConfigMergeBehavior.SHALLOW, meaning that the two dictionaries
            are shallowly merged - any shared values in the dictionaries will be replaced by the
            values set on this op. Setting it to DEEP will recursively merge the two dictionaries,
            appending list fields together andmerging dictionary fields.
    """
    run_container_context = K8sContainerContext.create_for_run(
        context.dagster_run,
        (
            context.instance.run_launcher
            if isinstance(context.instance.run_launcher, K8sRunLauncher)
            else None
        ),
        include_run_tags=False,
    )
    container_config = container_config.copy() if container_config else {}
    if command:
        container_config["command"] = command
    op_container_context = K8sContainerContext(
        image_pull_policy=image_pull_policy,
        image_pull_secrets=image_pull_secrets,
        service_account_name=service_account_name,
        env_config_maps=env_config_maps,
        env_secrets=env_secrets,
        env_vars=env_vars,
        volume_mounts=volume_mounts,
        volumes=volumes,
        labels=labels,
        namespace=namespace,
        resources=resources,
        scheduler_name=scheduler_name,
        run_k8s_config=UserDefinedDagsterK8sConfig.from_dict(
            {
                "container_config": container_config,
                "pod_template_spec_metadata": pod_template_spec_metadata,
                "pod_spec_config": pod_spec_config,
                "job_metadata": job_metadata,
                "job_spec_config": job_spec_config,
                "merge_behavior": merge_behavior.value,
            }
        ),
    )
    container_context = run_container_context.merge(op_container_context)
    namespace = container_context.namespace
    user_defined_k8s_config = container_context.run_k8s_config
    k8s_job_config = DagsterK8sJobConfig(
        job_image=image,
        dagster_home=None,
        image_pull_policy=container_context.image_pull_policy,
        image_pull_secrets=container_context.image_pull_secrets,
        service_account_name=container_context.service_account_name,
        instance_config_map=None,
        postgres_password_secret=None,
        env_config_maps=container_context.env_config_maps,
        env_secrets=container_context.env_secrets,
        env_vars=container_context.env_vars,
        volume_mounts=container_context.volume_mounts,
        volumes=container_context.volumes,
        labels=container_context.labels,
        resources=container_context.resources,
    )
    job_name = k8s_job_name or get_k8s_job_name(
        context.run_id, context.get_step_execution_context().step.key
    )
    retry_number = context.retry_number
    if retry_number > 0:
        job_name = f"{job_name}-{retry_number}"
    labels = {
        "dagster/job": context.dagster_run.job_name,
        "dagster/op": context.op.name,
        "dagster/run-id": context.dagster_run.run_id,
    }
    if context.dagster_run.external_job_origin:
        labels["dagster/code-location"] = (
            context.dagster_run.external_job_origin.external_repository_origin.code_location_origin.location_name
        )
    job = construct_dagster_k8s_job(
        job_config=k8s_job_config,
        args=args,
        job_name=job_name,
        pod_name=job_name,
        component="k8s_job_op",
        user_defined_k8s_config=user_defined_k8s_config,
        labels=labels,
    )
    if load_incluster_config:
        kubernetes.config.load_incluster_config()
    else:
        kubernetes.config.load_kube_config(kubeconfig_file)
    # changing this to be able to be passed in will allow for unit testing
    api_client = DagsterKubernetesClient.production_client()
    context.log.info(f"Creating Kubernetes job {job_name} in namespace {namespace}...")
    start_time = time.time()
    api_client.batch_api.create_namespaced_job(namespace, job)
    context.log.info("Waiting for Kubernetes job to finish...")
    timeout = timeout or 0
    try:
        api_client.wait_for_job(
            job_name=job_name,
            namespace=namespace,
            wait_timeout=timeout,
            start_time=start_time,
        )
        restart_policy = user_defined_k8s_config.pod_spec_config.get("restart_policy", "Never")
        if restart_policy == "Never":
            container_name = container_config.get("name", "dagster")
            pods = api_client.wait_for_job_to_have_pods(
                job_name,
                namespace,
                wait_timeout=timeout,
                start_time=start_time,
            )
            pod_names = [p.metadata.name for p in pods]
            if not pod_names:
                raise Exception("No pod names in job after it started")
            pod_to_watch = pod_names[0]
            watch = kubernetes.watch.Watch()  # consider moving in to api_client
            api_client.wait_for_pod(
                pod_to_watch, namespace, wait_timeout=timeout, start_time=start_time
            )
            log_stream = watch.stream(
                api_client.core_api.read_namespaced_pod_log,
                name=pod_to_watch,
                namespace=namespace,
                container=container_name,
            )
            while True:
                if timeout and time.time() - start_time > timeout:
                    watch.stop()
                    raise Exception("Timed out waiting for pod to finish")
                try:
                    log_entry = next(log_stream)
                    print(log_entry)  # noqa: T201
                except StopIteration:
                    break
        else:
            context.log.info("Pod logs are disabled, because restart_policy is not Never")
        if job_spec_config and job_spec_config.get("parallelism"):
            num_pods_to_wait_for = job_spec_config["parallelism"]
        else:
            num_pods_to_wait_for = DEFAULT_JOB_POD_COUNT
        api_client.wait_for_running_job_to_succeed(
            job_name=job_name,
            namespace=namespace,
            wait_timeout=timeout,
            start_time=start_time,
            num_pods_to_wait_for=num_pods_to_wait_for,
        )
    except (DagsterExecutionInterruptedError, Exception) as e:
        context.log.info(
            f"Deleting Kubernetes job {job_name} in namespace {namespace} due to exception"
        )
        api_client.delete_job(job_name=job_name, namespace=namespace)
        raise e 
[docs]@op(ins={"start_after": In(Nothing)}, config_schema=K8S_JOB_OP_CONFIG)
@experimental
def k8s_job_op(context):
    """An op that runs a Kubernetes job using the k8s API.
    Contrast with the `k8s_job_executor`, which runs each Dagster op in a Dagster job in its
    own k8s job.
    This op may be useful when:
      - You need to orchestrate a command that isn't a Dagster op (or isn't written in Python)
      - You want to run the rest of a Dagster job using a specific executor, and only a single
        op in k8s.
    For example:
    .. literalinclude:: ../../../../../../python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_example_k8s_job_op.py
      :start-after: start_marker
      :end-before: end_marker
      :language: python
    You can create your own op with the same implementation by calling the `execute_k8s_job` function
    inside your own op.
    The service account that is used to run this job should have the following RBAC permissions:
    .. literalinclude:: ../../../../../../examples/docs_snippets/docs_snippets/deploying/kubernetes/k8s_job_op_rbac.yaml
       :language: YAML
    """
    if "merge_behavior" in context.op_config:
        merge_behavior = K8sConfigMergeBehavior(context.op_config.pop("merge_behavior"))
    else:
        merge_behavior = K8sConfigMergeBehavior.SHALLOW
    execute_k8s_job(context, merge_behavior=merge_behavior, **context.op_config)