import dagster._check as check
import docker
from dagster._core.launcher.base import (
CheckRunHealthResult,
LaunchRunContext,
ResumeRunContext,
RunLauncher,
WorkerStatus,
)
from dagster._core.storage.pipeline_run import DagsterRun
from dagster._core.storage.tags import DOCKER_IMAGE_TAG
from dagster._core.utils import parse_env_var
from dagster._grpc.types import ExecuteRunArgs, ResumeRunArgs
from dagster._serdes import ConfigurableClass
from dagster_docker.utils import DOCKER_CONFIG_SCHEMA, validate_docker_config, validate_docker_image
from .container_context import DockerContainerContext
DOCKER_CONTAINER_ID_TAG = "docker/container_id"
[docs]class DockerRunLauncher(RunLauncher, ConfigurableClass):
"""Launches runs in a Docker container."""
def __init__(
self,
inst_data=None,
image=None,
registry=None,
env_vars=None,
network=None,
networks=None,
container_kwargs=None,
):
self._inst_data = inst_data
self.image = image
self.registry = registry
self.env_vars = env_vars
validate_docker_config(network, networks, container_kwargs)
if network:
self.networks = [network]
elif networks:
self.networks = networks
else:
self.networks = []
self.container_kwargs = check.opt_dict_param(
container_kwargs, "container_kwargs", key_type=str
)
super().__init__()
@property
def inst_data(self):
return self._inst_data
@classmethod
def config_type(cls):
return DOCKER_CONFIG_SCHEMA
@staticmethod
def from_config_value(inst_data, config_value):
return DockerRunLauncher(inst_data=inst_data, **config_value)
def get_container_context(self, pipeline_run: DagsterRun) -> DockerContainerContext:
return DockerContainerContext.create_for_run(pipeline_run, self)
def _get_client(self, container_context: DockerContainerContext):
client = docker.client.from_env()
if container_context.registry:
client.login(
registry=container_context.registry["url"],
username=container_context.registry["username"],
password=container_context.registry["password"],
)
return client
def _get_docker_image(self, pipeline_code_origin):
docker_image = pipeline_code_origin.repository_origin.container_image
if not docker_image:
docker_image = self.image
if not docker_image:
raise Exception("No docker image specified by the instance config or repository")
validate_docker_image(docker_image)
return docker_image
def _launch_container_with_command(self, run, docker_image, command):
container_context = self.get_container_context(run)
docker_env = dict([parse_env_var(env_var) for env_var in container_context.env_vars])
docker_env["DAGSTER_RUN_JOB_NAME"] = run.job_name
client = self._get_client(container_context)
try:
container = client.containers.create(
image=docker_image,
command=command,
detach=True,
environment=docker_env,
network=container_context.networks[0] if len(container_context.networks) else None,
**container_context.container_kwargs,
)
except docker.errors.ImageNotFound:
client.images.pull(docker_image)
container = client.containers.create(
image=docker_image,
command=command,
detach=True,
environment=docker_env,
network=container_context.networks[0] if len(container_context.networks) else None,
**container_context.container_kwargs,
)
if len(container_context.networks) > 1:
for network_name in container_context.networks[1:]:
network = client.networks.get(network_name)
network.connect(container)
self._instance.report_engine_event(
message=(
"Launching run in a new container {container_id} with image {docker_image}".format(
container_id=container.id,
docker_image=docker_image,
)
),
pipeline_run=run,
cls=self.__class__,
)
self._instance.add_run_tags(
run.run_id,
{DOCKER_CONTAINER_ID_TAG: container.id, DOCKER_IMAGE_TAG: docker_image},
)
container.start()
def launch_run(self, context: LaunchRunContext) -> None:
run = context.pipeline_run
pipeline_code_origin = check.not_none(context.pipeline_code_origin)
docker_image = self._get_docker_image(pipeline_code_origin)
command = ExecuteRunArgs(
pipeline_origin=pipeline_code_origin,
pipeline_run_id=run.run_id,
instance_ref=self._instance.get_ref(),
).get_command_args()
self._launch_container_with_command(run, docker_image, command)
@property
def supports_resume_run(self):
return True
def resume_run(self, context: ResumeRunContext) -> None:
run = context.pipeline_run
pipeline_code_origin = check.not_none(context.pipeline_code_origin)
docker_image = self._get_docker_image(pipeline_code_origin)
command = ResumeRunArgs(
pipeline_origin=pipeline_code_origin,
pipeline_run_id=run.run_id,
instance_ref=self._instance.get_ref(),
).get_command_args()
self._launch_container_with_command(run, docker_image, command)
def _get_container(self, run):
if not run or run.is_finished:
return None
container_id = run.tags.get(DOCKER_CONTAINER_ID_TAG)
if not container_id:
return None
container_context = self.get_container_context(run)
try:
return self._get_client(container_context).containers.get(container_id)
except Exception:
return None
def terminate(self, run_id):
run = self._instance.get_run_by_id(run_id)
container = self._get_container(run)
if not container:
self._instance.report_engine_event(
message="Unable to get docker container to send termination request to.",
pipeline_run=run,
cls=self.__class__,
)
return False
self._instance.report_run_canceling(run)
container.stop()
return True
@property
def supports_check_run_worker_health(self):
return True
def check_run_worker_health(self, run: DagsterRun):
container = self._get_container(run)
if container is None:
return CheckRunHealthResult(WorkerStatus.NOT_FOUND)
if container.status == "running":
return CheckRunHealthResult(WorkerStatus.RUNNING)
return CheckRunHealthResult(
WorkerStatus.FAILED, msg=f"Container status is {container.status}"
)