Partitioned assets and jobs#

Partitioning allows a software-defined asset or job to correspond to a set of entities with identical structure but different parameters.

A partitioned asset is an asset that's composed of a set of partitions, which can be materialized and tracked independently. Most commonly, each partition represents all the records in a data set that fall within a particular time window. Depending on where the asset is stored, each partition might correspond to a file or a slice of a table in a database.

A partitioned job is a job where each run corresponds to a partition key. Most commonly, each partition key represents a time window, so, when a job executes, it processes data within one of the time windows.

It's common to construct a partitioned job that materializes a particular set of partitioned assets every time it runs.

Having defined a partitioned job or asset, you can:

  • View runs by partition in Dagit.
  • Define a schedule that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
  • Launch backfills, which are sets of runs that each process a different partition. For example, after making a code change, you might want to run your job on all time windows instead of just one of them.

Partitioned assets#

Relevant APIs#

NameDescription
PartitionsDefinitionSuperclass - defines the set of partitions that can be materialized for an asset.
HourlyPartitionsDefinitionA partitions definition with a partition for each hour.
DailyPartitionsDefinitionA partitions definition with a partition for each day.
WeeklyPartitionsDefinitionA partitions definition with a partition for each week.
MonthlyPartitionsDefinitionA partitions definition with a partition for each month.
StaticPartitionsDefinitionA partitions definition with a fixed set of partitions.
MultiPartitionsDefinitionA partitions definition with multiple dimensions.
MultiPartitionKeyA multi-dimensional partition key.

A software-defined asset can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.

Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in Dagit.

Defining partitioned assets#

For example, below is a software-defined asset with a partition for each day since the first day of 2022. Materializing partition 2022-07-23 of this asset would result in fetching data from the URL coolweatherwebsite.com/weather_obs\&date=2022-07-23 and storing it at the path weather_observations/2022-07-23.csv.

import urllib.request

from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context) -> None:
    partition_date_str = context.asset_partition_key_for_output()

    url = f"coolweatherwebsite.com/weather_obs&date={partition_date_str}"
    target_location = f"weather_observations/{partition_date_str}.csv"

    urllib.request.urlretrieve(url, target_location)

Partitioned assets with partitioned I/O managers#

In the above code snippet, the body of the decorated function writes out data to a file, but it's common to delegate this I/O to an I/O manager. Dagster's built-in I/O managers know how to handle partitioned assets. You can also handle them when writing your own I/O manager, following the instructions here.

Here's a software-defined asset that relies on an I/O manager to store its output:

import pandas as pd

from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context) -> pd.DataFrame:
    partition_date_str = context.asset_partition_key_for_output()
    return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")

If you're using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.

Multi-dimensionally partitioned assets#

This feature is experimental.

The MultiPartitionsDefinition class accepts a mapping of dimension name to partitions definition, creating a partition for each unique combination of dimension partitions. For example, the asset below would contain a partition for each combination of color and date: red|2022-01-01, yellow|2022-01-01, blue|2022-01-01, red|2022-01-02 and so on.

from dagster import (
    DailyPartitionsDefinition,
    MultiPartitionsDefinition,
    StaticPartitionsDefinition,
    asset,
)


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date="2022-01-01"),
            "color": StaticPartitionsDefinition(["red", "yellow", "blue"]),
        }
    )
)
def multi_partitions_asset(context):
    context.log.info(context.partition_key.keys_by_dimension)

Currently, Dagster only allows two-dimensional multipartitions definitions.

Notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:

from dagster import MultiPartitionKey, materialize

result = materialize(
    [multi_partitions_asset],
    partition_key=MultiPartitionKey({"date": "2022-01-01", "color": "red"}),
)

Viewing asset partition status#

To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.

In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:

Materializing partitioned assets#

When you materialize a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition.

Note: If you choose more than one partition, the Dagster daemon needs to be running to queue the multiple runs.

After you've materialized a partition, it will show up as green in the partitions bar:

To view materializations by partition, navigate to the Activity tab:

Partition dependencies#

When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.

A few rules govern partition-to-partition dependencies:

  • When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset depends on the same partition in the upstream asset.

  • When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.

    For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset the would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.


Partitioned asset jobs#

A partitioned asset job is a job that materializes a particular set of partitioned assets every time it runs.

from dagster import (
    AssetSelection,
    Definitions,
    HourlyPartitionsDefinition,
    asset,
    define_asset_job,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")


@asset(partitions_def=hourly_partitions_def)
def asset1():
    ...


@asset(partitions_def=hourly_partitions_def)
def asset2():
    ...


partitioned_asset_job = define_asset_job(
    name="asset_1_and_2_job",
    selection=AssetSelection.assets(asset1, asset2),
    partitions_def=hourly_partitions_def,
)


defs = Definitions(
    assets=[asset1, asset2],
    jobs=[partitioned_asset_job],
)

Partitioned non-asset jobs#

Relevant APIs#

NameDescription
PartitionedConfigDetermines a set of partitions and how to generate run config for a partition.
@daily_partitioned_configDecorator for constructing partitioned config where each partition is a date.
@hourly_partitioned_configDecorator for constructing partitioned config where each partition is an hour of a date.
@weekly_partitioned_configDecorator for constructing partitioned config where each partition is a week.
@monthly_partitioned_configDecorator for constructing partitioned config where each partition is a month.
@static_partitioned_configDecorator for constructing partitioned config for a static set of partition keys.
@dynamic_partitioned_configDecorator for constructing partitioned config for a set of partition keys that can grow over time.
build_schedule_from_partitioned_jobA function that constructs a schedule whose interval matches the partitioning of a partitioned job.

When defining a job that doesn't use software-defined assets, you can make it partitioned by supplying PartitionedConfig object as its config.

Defining a job with time partitions#

The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.

Non-partitioned job with date config#

Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:

from dagster import job, op


@op(config_schema={"date": str})
def process_data_for_date(context):
    date = context.op_config["date"]
    context.log.info(f"processing data for {date}")


@job
def do_stuff():
    process_data_for_date()

It takes, as config, a string date. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:

graph:
  process_data_for_date:
    config:
      date: "2020-05-05"

Date-partitioned job#

With the job above, it's possible to supply any value for the date param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.

First, you define the PartitionedConfig. In this case, because each partition is a date, you can use the @daily_partitioned_config decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.

from dagster import daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }

Then you can build a job that uses the PartitionedConfig by supplying it to the config argument when you construct the job:

@job(config=my_partitioned_config)
def do_stuff_partitioned():
    process_data_for_date()

Dagit partitions tab#

In Dagit, you can view runs by partition in the Partitions tab of a Job page:

Partitions Tab

In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.

You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the 2020-01-02 partition, and we can see that the run config for the partition has been populated in the editor.

Partitions in Dagit Launchpad

In addition to the @daily_partitioned_config decorator, Dagster also provides @monthly_partitioned_config, @weekly_partitioned_config, @hourly_partitioned_config. See the API docs for each of these decorators for more information on how partitions are built based on different start_date, minute_offset, hour_offset, and day_offset inputs.

Defining a job with static partitions#

Not all jobs are partitioned by time. Here's a partitioned job where the partitions are continents:

from dagster import job, op, static_partitioned_config

CONTINENTS = [
    "Africa",
    "Antarctica",
    "Asia",
    "Europe",
    "North America",
    "Oceania",
    "South America",
]


@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
    return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}


@op(config_schema={"continent_name": str})
def continent_op(context):
    context.log.info(context.op_config["continent_name"])


@job(config=continent_config)
def continent_job():
    continent_op()

Creating schedules from partitioned jobs#

It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.

The build_schedule_from_partitioned_job function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned job defined above, you could write:

from dagster import build_schedule_from_partitioned_job, job


@job(config=my_partitioned_config)
def do_stuff_partitioned():
    ...


do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
    do_stuff_partitioned,
)

Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job above that runs each partition, you could write:

from dagster import schedule


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
    for c in CONTINENTS:
        request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
        yield request

Or a schedule that will run a subselection of the partition

@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
    request = continent_job.run_request_for_partition(
        partition_key="Antarctica", run_key=None
    )
    yield request

Refer to the Schedules documentation for more info about constructing both schedule types.


Testing#

Testing partitioned config#

Invoking a PartitionedConfig object will directly invoke the decorated function.

If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config function.

from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }


def test_my_partitioned_config():
    # assert that the decorated function returns the expected output
    run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
    assert run_config == {
        "ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
    }

    # assert that the output of the decorated function is valid configuration for the
    # do_stuff_partitioned job
    assert validate_run_config(do_stuff_partitioned, run_config)

If you want to test that your PartitionedConfig creates the partitions you expect, you can use the get_partition_keys or get_run_config_for_partition_key functions.

@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data": {
                "config": {
                    "start": start.strftime("%Y-%m-%d-%H:%M"),
                    "end": _end.strftime("%Y-%m-%d-%H:%M"),
                }
            }
        }
    }


@op(config_schema={"start": str, "end": str})
def process_data(context):
    s = context.op_config["start"]
    e = context.op_config["end"]
    context.log.info(f"processing data for {s} - {e}")


@job(config=my_offset_partitioned_config)
def do_more_stuff_partitioned():
    process_data()


def test_my_offset_partitioned_config():
    # test that the partition keys are what you expect
    keys = my_offset_partitioned_config.get_partition_keys()
    assert keys[0] == "2020-01-01"
    assert keys[1] == "2020-01-02"

    # test that the run_config for a partition is valid for do_stuff_partitioned
    run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
    assert validate_run_config(do_more_stuff_partitioned, run_config)

    # test that the contents of run_config are what you expect
    assert run_config == {
        "ops": {
            "process_data": {
                "config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}
            }
        }
    }

Testing partitioned jobs#

To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key argument of JobDefinition.execute_in_process

def test_do_stuff_partitioned():
    assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success

See it in action#

For more examples of partitions, check out the following in our Hacker News example: