Source code for dagster._core.execution.validate_run_config
from typing import Any, Mapping, Optional, cast
import dagster._check as check
from dagster._core.definitions import JobDefinition, PipelineDefinition
from dagster._core.system_config.objects import ResolvedRunConfig
[docs]def validate_run_config(
job_def: Optional[JobDefinition] = None,
run_config: Optional[Mapping[str, Any]] = None,
mode: Optional[str] = None,
pipeline_def: Optional[PipelineDefinition] = None,
) -> Mapping[str, Any]:
"""Function to validate a provided run config blob against a given job. For legacy APIs, a
pipeline/mode can also be passed in.
If validation is successful, this function will return a dictionary representation of the
validated config actually used during execution.
Args:
job_def (Union[PipelineDefinition, JobDefinition]): The job definition to validate run
config against
run_config (Optional[Dict[str, Any]]): The run config to validate
mode (str): The mode of the pipeline to validate against (different modes may require
different config)
pipeline_def (PipelineDefinition): The pipeline definition to validate run config against.
Returns:
Dict[str, Any]: A dictionary representation of the validated config.
"""
job_def = check.opt_inst_param(job_def, "job_def", (JobDefinition, PipelineDefinition))
pipeline_def = check.opt_inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_mapping_param(run_config, "run_config", key_type=str)
if job_def and pipeline_def:
check.failed("Cannot specify both a job_def and a pipeline_def")
pipeline_or_job_def = pipeline_def or job_def
if pipeline_or_job_def is None:
check.failed("Must specify at least one of job_def and pipeline_def")
pipeline_or_job_def = cast(PipelineDefinition, pipeline_def or job_def)
mode = check.opt_str_param(mode, "mode", default=pipeline_or_job_def.get_default_mode_name())
return ResolvedRunConfig.build(pipeline_or_job_def, run_config, mode=mode).to_dict()