Execution#

Dagster provides several different ways to execute jobs.

Relevant APIs#

NameDescription
JobDefinition.execute_in_processA method to execute a job synchronously, typically for running scripts or testing.

Overview#

There are several different ways to execute jobs. This page explains different ways to do one-off execution of jobs: Dagit, Dagster CLI, or Python APIs.

You can also launch jobs in other ways:

  • Schedules can be used to launch runs on a fixed interval.
  • Sensors allow you to launch runs based on external state changes.

Executing a Job#

from dagster import job, op


@op
def return_one():
    return 1


@op
def add_two(i: int):
    return i + 2


@op
def multi_three(i: int):
    return i * 3


@job
def my_job():
    multi_three(add_two(return_one()))

Dagit#

Dagster comes with a web-based interface for viewing and interacting with jobs and other Dagster objects.

To view your job in Dagit, you can use the dagit command:

dagit -f my_job.py

Then navigate to http://localhost:3000 to start using Dagit:

pipeline-def

Click on the "Launchpad" tab, then press the "Launch Run" button to execute the job. You will then see Dagit launch a job run:

job-run

By default, Dagit will run the job using the multiprocess executor - that means each step in the job runs in its own process, and steps that don't depend on each other can run in parallel.

Dagit Launchpad also offers a configuration editor to let you interactively build up the configuration. See details in Dagit.

Dagster CLI#

The dagster CLI includes both dagster job execute for direct execution and dagster job launch for launching runs asynchronously using the run launcher on your instance.

To execute your job directly, you can simply run:

dagster job execute -f my_job.py

Python APIs#

Dagster includes Python APIs for execution that are useful when writing tests or scripts.

JobDefinition.execute_in_process executes a job and returns an ExecuteInProcessResult.
if __name__ == "__main__":
    result = my_job.execute_in_process()

You can find the full API documentation in Execution API and learn more about the testing use cases in Testing.

Executing Job Subsets#

Dagster supports ways to run a subset of a job, called Op Selection.

Op Selection Syntax#

To specify op selection, Dagster supports a simple query syntax.

It works as follows:

  • A query includes a list of clauses.
  • A clause can be a op name, in which case that op is selected.
  • A clause can be a op name preceded by *, in which case that op and all of its ancestors (upstream dependencies) are selected.
  • A clause can be a op name followed by *, in which case that op and all of its descendents (downstream dependencies) are selected.
  • A clause can be a op name followed by any number of +s, in which case that op and descendents up to that many hops away are selected.
  • A clause can be a op name preceded by any number of +s, in which case that op and ancestors up to that many hops away are selected.

Clause examples

  • some_op: select "some_op" itself
  • *some_op: select "some_op" and all ancestors (upstream dependencies).
  • some_op*: select "some_op" and all descendants (downstream dependencies).
  • *some_op*: select "some_op" and all of its ancestors and descendants.
  • +some_op: select "some_op" and its direct parents.
  • some_op+++: select "some_op" and its children, its children's children, and its children's children's children.

Specifying Op Selection#

You can use this selection syntax in the op_selection argument to the JobDefinition.execute_in_process:

my_job.execute_in_process(op_selection=["*add_two"])

Similarly, you can specify the same op selection in Dagit Launchpad:

op-selection

Controlling Job Execution#

Each JobDefinition contains an ExecutorDefinition that determines how it will be executed.

This executor_def property can be set to allow for different types of isolation and parallelism, ranging from executing all the ops in the same process to executing each op in its own Kubernetes pod. See Executors for more details.

Default Job Executor#

The default job executor definition defaults to multiprocess execution. It also allows you to toggle between in process and multiprocess execution via config.

Here is an example of run config as yaml you could provide in the dagit playground to do an in process execution.

execution:
  config:
    in_process:

Additional config options are available for multiprocess execution that can help with performance. This includes limiting the max concurrent subprocesses and controlling how those subprocess are spawned.

The example below sets the run config directly on the job to explicitly set the max concurrent subprocesses to 4, and change the subprocess start method to use a forkserver.

@job(
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "start_method": {
                        "forkserver": {},
                    },
                    "max_concurrent": 4,
                },
            }
        }
    }
)
def forkserver_job():
    multi_three(add_two(return_one()))

Using forkserver is a great way to reduce per process overhead during multiprocess execution, but can cause issues with certain libraries. More details can be found here.

Op Concurrency limits#

In addition to the max_concurrent limit, you can use tag_concurrency_limits to specify limits on the number of ops with certain tags that can be executing at once within a single run.

Limits can be specified for all ops with a certain tag key or key-value pair. If any limit would be exceeded by launching a op, then the op will stay queued. Asset jobs will look at the op_tags field on each asset in the job when checking them for tag concurrency limits.

For example, the following job will execute at most two ops at once with the "database" tag equal to "redshift", while also ensuring that at most 4 ops execute at once:

@job(
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 4,
                    "tag_concurrency_limits": [
                        {
                            "key": "database",
                            "value": "redshift",
                            "limit": 2,
                        }
                    ],
                },
            }
        }
    }
)
def tag_concurrency_job():
    ...

Note: These limits are only applied on a per-run basis. You can apply op concurrency limits across multiple runs using the celery_executor or celery_k8s_job_executor.