Source code for dagster._core.definitions.asset_reconciliation_sensor

# pylint: disable=anomalous-backslash-in-string

import datetime
import itertools
import json
from collections import defaultdict
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Dict,
    Iterable,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    cast,
)

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessConstraint
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

from .asset_selection import AssetGraph, AssetSelection
from .decorators.sensor_decorator import sensor
from .partition import PartitionsDefinition, PartitionsSubset
from .repository_definition import RepositoryDefinition
from .run_request import RunRequest
from .sensor_definition import DefaultSensorStatus, SensorDefinition
from .utils import check_valid_name

if TYPE_CHECKING:
    from dagster._core.instance import DagsterInstance


class AssetReconciliationCursor(NamedTuple):
    """
    Attributes:
        latest_storage_id: The latest observed storage ID across all assets. Useful for
            finding out what has happened since the last tick.
        materialized_or_requested_root_asset_keys: Every entry is a non-partitioned asset with no
            parents that has been requested by this sensor or has been materialized (even if not by
            this sensor).
        materialized_or_requested_root_partitions_by_asset_key: Every key is a partitioned root
            asset. Every value is the set of that asset's partitoins that have been requested by
            this sensor or have been materialized (even if not by this sensor).
    """

    latest_storage_id: Optional[int]
    materialized_or_requested_root_asset_keys: AbstractSet[AssetKey]
    materialized_or_requested_root_partitions_by_asset_key: Mapping[AssetKey, PartitionsSubset]

    def was_previously_materialized_or_requested(self, asset_key: AssetKey) -> bool:
        return asset_key in self.materialized_or_requested_root_asset_keys

    def get_never_requested_never_materialized_partitions(
        self, asset_key: AssetKey, asset_graph
    ) -> Iterable[str]:
        return self.materialized_or_requested_root_partitions_by_asset_key.get(
            asset_key, asset_graph.get_partitions_def(asset_key).empty_subset()
        ).get_partition_keys_not_in_subset()

    def with_updates(
        self,
        latest_storage_id: Optional[int],
        run_requests: Sequence[RunRequest],
        newly_materialized_root_asset_keys: AbstractSet[AssetKey],
        newly_materialized_root_partitions_by_asset_key: Mapping[AssetKey, AbstractSet[str]],
        asset_graph: AssetGraph,
    ) -> "AssetReconciliationCursor":
        """
        Returns a cursor that represents this cursor plus the updates that have happened within the
        tick.
        """
        requested_root_partitions_by_asset_key: Dict[AssetKey, Set[str]] = defaultdict(set)
        requested_non_partitioned_root_assets: Set[AssetKey] = set()

        for run_request in run_requests:
            for asset_key in cast(Iterable[AssetKey], run_request.asset_selection):
                if not asset_graph.has_non_source_parents(asset_key):
                    if run_request.partition_key:
                        requested_root_partitions_by_asset_key[asset_key].add(
                            run_request.partition_key
                        )
                    else:
                        requested_non_partitioned_root_assets.add(asset_key)

        result_materialized_or_requested_root_partitions_by_asset_key = {
            **self.materialized_or_requested_root_partitions_by_asset_key
        }
        for asset_key in set(newly_materialized_root_partitions_by_asset_key.keys()) | set(
            requested_root_partitions_by_asset_key.keys()
        ):
            prior_materialized_partitions = (
                self.materialized_or_requested_root_partitions_by_asset_key.get(asset_key)
            )
            if prior_materialized_partitions is None:
                prior_materialized_partitions = cast(
                    PartitionsDefinition, asset_graph.get_partitions_def(asset_key)
                ).empty_subset()

            result_materialized_or_requested_root_partitions_by_asset_key[
                asset_key
            ] = prior_materialized_partitions.with_partition_keys(
                itertools.chain(
                    newly_materialized_root_partitions_by_asset_key[asset_key],
                    requested_root_partitions_by_asset_key[asset_key],
                )
            )

        result_materialized_or_requested_root_asset_keys = (
            self.materialized_or_requested_root_asset_keys
            | newly_materialized_root_asset_keys
            | requested_non_partitioned_root_assets
        )

        if latest_storage_id and self.latest_storage_id:
            check.invariant(
                latest_storage_id >= self.latest_storage_id,
                "Latest storage ID should be >= previous latest storage ID",
            )

        return AssetReconciliationCursor(
            latest_storage_id=latest_storage_id or self.latest_storage_id,
            materialized_or_requested_root_asset_keys=result_materialized_or_requested_root_asset_keys,
            materialized_or_requested_root_partitions_by_asset_key=result_materialized_or_requested_root_partitions_by_asset_key,
        )

    @classmethod
    def empty(cls) -> "AssetReconciliationCursor":
        return AssetReconciliationCursor(
            latest_storage_id=None,
            materialized_or_requested_root_partitions_by_asset_key={},
            materialized_or_requested_root_asset_keys=set(),
        )

    @classmethod
    def from_serialized(cls, cursor: str, asset_graph: AssetGraph) -> "AssetReconciliationCursor":
        (
            latest_storage_id,
            serialized_materialized_or_requested_root_asset_keys,
            serialized_materialized_or_requested_root_partitions_by_asset_key,
        ) = json.loads(cursor)
        materialized_or_requested_root_partitions_by_asset_key = {}
        for (
            key_str,
            serialized_subset,
        ) in serialized_materialized_or_requested_root_partitions_by_asset_key.items():
            key = AssetKey.from_user_string(key_str)
            materialized_or_requested_root_partitions_by_asset_key[key] = cast(
                PartitionsDefinition, asset_graph.get_partitions_def(key)
            ).deserialize_subset(serialized_subset)
        return cls(
            latest_storage_id=latest_storage_id,
            materialized_or_requested_root_asset_keys={
                AssetKey.from_user_string(key_str)
                for key_str in serialized_materialized_or_requested_root_asset_keys
            },
            materialized_or_requested_root_partitions_by_asset_key=materialized_or_requested_root_partitions_by_asset_key,
        )

    def serialize(self) -> str:
        serializable_materialized_or_requested_root_partitions_by_asset_key = {
            key.to_user_string(): subset.serialize()
            for key, subset in self.materialized_or_requested_root_partitions_by_asset_key.items()
        }
        serialized = json.dumps(
            (
                self.latest_storage_id,
                [key.to_user_string() for key in self.materialized_or_requested_root_asset_keys],
                serializable_materialized_or_requested_root_partitions_by_asset_key,
            )
        )
        return serialized


def find_parent_materialized_asset_partitions(
    instance_queryer: CachingInstanceQueryer,
    latest_storage_id: Optional[int],
    target_asset_selection: AssetSelection,
    asset_graph: AssetGraph,
) -> Tuple[AbstractSet[AssetKeyPartitionKey], Optional[int]]:
    """
    Finds asset partitions in the given selection whose parents have been materialized since
    latest_storage_id.

    Returns:
        - A set of asset partitions.
        - The latest observed storage_id across all relevant assets. Can be used to avoid scanning
            the same events the next time this function is called.
    """
    result_asset_partitions: Set[AssetKeyPartitionKey] = set()
    result_latest_storage_id = latest_storage_id

    target_asset_keys = target_asset_selection.resolve(asset_graph)

    for asset_key in target_asset_selection.upstream(depth=1).resolve(asset_graph):
        records = instance_queryer.get_materialization_records(
            asset_key=asset_key, after_cursor=latest_storage_id
        )
        for record in records:
            for child in asset_graph.get_children_partitions(asset_key, record.partition_key):
                if child.asset_key in target_asset_keys and not instance_queryer.is_asset_in_run(
                    record.run_id, child
                ):
                    result_asset_partitions.add(child)

            if result_latest_storage_id is None or record.storage_id > result_latest_storage_id:
                result_latest_storage_id = record.storage_id

    return (result_asset_partitions, result_latest_storage_id)


def find_never_materialized_or_requested_root_asset_partitions(
    instance_queryer: CachingInstanceQueryer,
    cursor: AssetReconciliationCursor,
    target_asset_selection: AssetSelection,
    asset_graph: AssetGraph,
) -> Tuple[
    Iterable[AssetKeyPartitionKey], AbstractSet[AssetKey], Mapping[AssetKey, AbstractSet[str]]
]:
    """Finds asset partitions that have never been materialized or requested and that have no
    parents.

    Returns:
    - Asset (partition)s that have never been materialized or requested.
    - Non-partitioned assets that had never been materialized or requested up to the previous cursor
        but are now materialized.
    - Asset (partition)s that had never been materialized or requested up to the previous cursor but
        are now materialized.
    """
    never_materialized_or_requested = set()
    newly_materialized_root_asset_keys = set()
    newly_materialized_root_partitions_by_asset_key = defaultdict(set)

    for asset_key in (target_asset_selection & AssetSelection.all().sources()).resolve(asset_graph):
        if asset_graph.is_partitioned(asset_key):
            for partition_key in cursor.get_never_requested_never_materialized_partitions(
                asset_key, asset_graph
            ):
                asset_partition = AssetKeyPartitionKey(asset_key, partition_key)
                if instance_queryer.get_latest_materialization_record(asset_partition, None):
                    newly_materialized_root_partitions_by_asset_key[asset_key].add(partition_key)
                else:
                    never_materialized_or_requested.add(asset_partition)
        else:
            if not cursor.was_previously_materialized_or_requested(asset_key):
                asset = AssetKeyPartitionKey(asset_key)
                if instance_queryer.get_latest_materialization_record(asset, None):
                    newly_materialized_root_asset_keys.add(asset_key)
                else:
                    never_materialized_or_requested.add(asset)

    return (
        never_materialized_or_requested,
        newly_materialized_root_asset_keys,
        newly_materialized_root_partitions_by_asset_key,
    )


def determine_asset_partitions_to_reconcile(
    instance_queryer: CachingInstanceQueryer,
    cursor: AssetReconciliationCursor,
    target_asset_selection: AssetSelection,
    asset_graph: AssetGraph,
    eventual_asset_partitions_to_reconcile_for_freshness: AbstractSet[AssetKeyPartitionKey],
) -> Tuple[
    AbstractSet[AssetKeyPartitionKey],
    AbstractSet[AssetKey],
    Mapping[AssetKey, AbstractSet[str]],
    Optional[int],
]:
    (
        never_materialized_or_requested_roots,
        newly_materialized_root_asset_keys,
        newly_materialized_root_partitions_by_asset_key,
    ) = find_never_materialized_or_requested_root_asset_partitions(
        instance_queryer=instance_queryer,
        cursor=cursor,
        target_asset_selection=target_asset_selection,
        asset_graph=asset_graph,
    )

    stale_candidates, latest_storage_id = find_parent_materialized_asset_partitions(
        instance_queryer=instance_queryer,
        latest_storage_id=cursor.latest_storage_id,
        target_asset_selection=target_asset_selection,
        asset_graph=asset_graph,
    )

    target_asset_keys = target_asset_selection.resolve(asset_graph)

    def parents_will_be_reconciled(
        candidate: AssetKeyPartitionKey,
        to_reconcile: AbstractSet[AssetKeyPartitionKey],
    ) -> bool:
        return all(
            (
                (
                    parent in to_reconcile
                    # if they don't have the same partitioning, then we can't launch a run that
                    # targets both, so we need to wait until the parent is reconciled before
                    # launching a run for the child
                    and asset_graph.have_same_partitioning(parent.asset_key, candidate.asset_key)
                    and parent.partition_key == candidate.partition_key
                )
                or (instance_queryer.is_reconciled(asset_partition=parent, asset_graph=asset_graph))
            )
            for parent in asset_graph.get_parents_partitions(
                candidate.asset_key, candidate.partition_key
            )
        )

    def should_reconcile(
        candidates_unit: Iterable[AssetKeyPartitionKey],
        to_reconcile: AbstractSet[AssetKeyPartitionKey],
    ) -> bool:
        if any(
            candidate in eventual_asset_partitions_to_reconcile_for_freshness
            or candidate.asset_key not in target_asset_keys
            for candidate in candidates_unit
        ):
            return False

        return all(
            parents_will_be_reconciled(candidate, to_reconcile) for candidate in candidates_unit
        ) and any(
            not instance_queryer.is_reconciled(asset_partition=candidate, asset_graph=asset_graph)
            for candidate in candidates_unit
        )

    to_reconcile = asset_graph.bfs_filter_asset_partitions(
        should_reconcile,
        set(itertools.chain(never_materialized_or_requested_roots, stale_candidates)),
    )

    return (
        to_reconcile,
        newly_materialized_root_asset_keys,
        newly_materialized_root_partitions_by_asset_key,
        latest_storage_id,
    )


def get_freshness_constraints_by_key(
    instance_queryer: CachingInstanceQueryer,
    asset_graph: AssetGraph,
    plan_window_start: datetime.datetime,
    plan_window_end: datetime.datetime,
) -> Mapping[AssetKey, AbstractSet[FreshnessConstraint]]:
    # a dictionary mapping each asset to a set of constraints that must be satisfied about the data
    # times of its upstream assets
    constraints_by_key: Dict[AssetKey, AbstractSet[FreshnessConstraint]] = defaultdict(set)

    # for each asset with a FreshnessPolicy, get all unsolved constraints for the given time window
    has_freshness_policy = False
    for key, freshness_policy in asset_graph.freshness_policies_by_key.items():
        if freshness_policy is None:
            continue
        has_freshness_policy = True
        upstream_keys = asset_graph.get_non_source_roots(key)
        latest_record = instance_queryer.get_latest_materialization_record(key)
        used_data_times = (
            instance_queryer.get_used_data_times_for_record(
                asset_graph=asset_graph, record=latest_record, upstream_keys=upstream_keys
            )
            if latest_record is not None
            else {upstream_key: None for upstream_key in upstream_keys}
        )
        constraints_by_key[key] = freshness_policy.constraints_for_time_window(
            window_start=plan_window_start,
            window_end=plan_window_end,
            used_data_times=used_data_times,
        )

    # no freshness policies, so don't bother with constraints
    if not has_freshness_policy:
        return {}

    # propagate constraints upwards through the graph
    #
    # we ignore whether or not the constraint we're propagating corresponds to an asset which
    # is actually upstream of the asset we're operating on, as we'll filter those invalid
    # constraints out in the next step, and it's expensive to calculate if a given asset is
    # upstream of another asset.
    for level in reversed(asset_graph.toposort_asset_keys()):
        for key in level:
            if asset_graph.is_source(key):
                continue
            for upstream_key in asset_graph.get_parents(key):
                # pass along all of your constraints to your parents
                constraints_by_key[upstream_key] |= constraints_by_key[key]
    return constraints_by_key


def get_current_data_times_for_key(
    instance_queryer: CachingInstanceQueryer,
    asset_graph: AssetGraph,
    relevant_upstream_keys: AbstractSet[AssetKey],
    asset_key: AssetKey,
) -> Mapping[AssetKey, Optional[datetime.datetime]]:
    # calculate the data time for this record in relation to the upstream keys which are
    # set to be updated this tick and are involved in some constraint
    latest_record = instance_queryer.get_latest_materialization_record(asset_key)
    if latest_record is None:
        return {upstream_key: None for upstream_key in relevant_upstream_keys}
    else:
        return instance_queryer.get_used_data_times_for_record(
            asset_graph=asset_graph,
            upstream_keys=relevant_upstream_keys,
            record=latest_record,
        )


def get_expected_data_times_for_key(
    asset_graph: AssetGraph,
    current_time: datetime.datetime,
    expected_data_times_by_key: Mapping[AssetKey, Mapping[AssetKey, Optional[datetime.datetime]]],
    asset_key: AssetKey,
) -> Mapping[AssetKey, Optional[datetime.datetime]]:
    """Returns the data times for the given asset key if this asset were to be executed in this
    tick.
    """
    expected_data_times: Dict[AssetKey, datetime.datetime] = {asset_key: current_time}

    def _min_or_none(a, b):
        if a is None or b is None:
            return None
        return min(a, b)

    # get the expected data time for each upstream asset key if you were to run this asset on
    # this tick
    for upstream_key in asset_graph.get_parents(asset_key):
        for upstream_upstream_key, expected_data_time in expected_data_times_by_key[
            upstream_key
        ].items():
            # take the minimum data time from each of your parents that uses this key
            expected_data_times[upstream_upstream_key] = _min_or_none(
                expected_data_times.get(upstream_upstream_key, expected_data_time),
                expected_data_time,
            )

    return expected_data_times


def get_execution_time_window_for_constraints(
    constraints: AbstractSet[FreshnessConstraint],
    current_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
    in_progress_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
    failed_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
    expected_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
    relevant_upstream_keys: AbstractSet[AssetKey],
) -> Tuple[Optional[datetime.datetime], Optional[datetime.datetime]]:
    """Determines a range of times for which you can kick off an execution of this asset to solve
    the most pressing constraint, alongside a maximum number of additional constraints.
    """
    currently_executable = False
    execution_window_start = None
    execution_window_end = None
    min_dt = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

    for constraint in sorted(constraints, key=lambda c: c.required_by_time):
        # the set of keys in this constraint that are actually upstream of this asset
        relevant_constraint_keys = constraint.asset_keys & relevant_upstream_keys

        if not all(
            # ensure that this constraint is not satisfied by the current state of the data and
            # will not be satisfied once all in progress runs complete, and was not intended to
            # be satisfied by a run that failed
            (current_data_times.get(key) or min_dt) >= constraint.required_data_time
            or (in_progress_data_times.get(key) or min_dt) >= constraint.required_data_time
            or (failed_data_times.get(key) or min_dt) >= constraint.required_data_time
            for key in relevant_constraint_keys
        ):
            # for this constraint, if all required data times will be satisfied by an execution
            # on this tick, it is valid to execute this asset
            if all(
                (expected_data_times.get(key) or min_dt) >= constraint.required_data_time
                for key in relevant_constraint_keys
            ):
                currently_executable = True

            # you can solve this constraint within the existing execution window
            if execution_window_end is None or constraint.required_data_time < execution_window_end:
                execution_window_start = max(
                    execution_window_start or constraint.required_data_time,
                    constraint.required_data_time,
                )
                execution_window_end = min(
                    execution_window_end or constraint.required_by_time,
                    constraint.required_by_time,
                )
            else:
                break

    if not currently_executable:
        return None, execution_window_end

    return execution_window_start, execution_window_end


def determine_asset_partitions_to_reconcile_for_freshness(
    instance_queryer: CachingInstanceQueryer,
    asset_graph: AssetGraph,
    target_asset_selection: AssetSelection,
) -> Tuple[AbstractSet[AssetKeyPartitionKey], AbstractSet[AssetKeyPartitionKey]]:
    """Returns a set of AssetKeyPartitionKeys to materialize in order to abide by the given
    FreshnessPolicies, as well as a set of AssetKeyPartitionKeys which will be materialized at
    some point within the plan window.

    Attempts to minimize the total number of asset executions.
    """
    # look within a 12-hour time window to combine future runs together
    current_time = pendulum.now(tz=pendulum.UTC)
    plan_window_start = current_time
    plan_window_end = plan_window_start + datetime.timedelta(hours=12)

    # get a set of constraints that must be satisfied for each key
    constraints_by_key = get_freshness_constraints_by_key(
        instance_queryer, asset_graph, plan_window_start, plan_window_end
    )

    # no constraints, so exit early
    if not constraints_by_key:
        return (set(), set())

    # get the set of asset keys we're allowed to execute
    target_asset_keys = target_asset_selection.resolve(asset_graph)

    # now we have a full set of constraints, we can find solutions for them as we move down
    to_materialize: Set[AssetKeyPartitionKey] = set()
    eventually_materialize: Set[AssetKeyPartitionKey] = set()
    expected_data_times_by_key: Dict[
        AssetKey, Mapping[AssetKey, Optional[datetime.datetime]]
    ] = defaultdict(dict)
    for level in asset_graph.toposort_asset_keys():
        for key in level:
            if asset_graph.is_source(key):
                continue
            # no need to evaluate this key, as it has no constraints
            constraints = constraints_by_key[key]
            if not constraints:
                continue

            constraint_keys = set().union(*(constraint.asset_keys for constraint in constraints))

            # the set of asset keys which are involved in some constraint and are actually upstream
            # of this asset
            relevant_upstream_keys = frozenset(
                set().union(
                    *(
                        expected_data_times_by_key[parent_key].keys()
                        for parent_key in asset_graph.get_parents(key)
                    )
                )
                & constraint_keys
            ) | {key}

            # figure out the current contents of this asset with respect to its constraints
            current_data_times = get_current_data_times_for_key(
                instance_queryer, asset_graph, relevant_upstream_keys, key
            )

            # should not execute if key is not targeted or previous run failed
            if key not in target_asset_keys:
                # cannot execute this asset, so if something consumes it, it should expect to
                # recieve the current contents of the asset
                execution_window_start = None
            else:
                # calculate the data times you would expect after all currently-executing runs
                # were to successfully complete
                in_progress_data_times = instance_queryer.get_in_progress_data_times_for_key(
                    asset_graph, key, relevant_upstream_keys, current_time
                )

                # if the latest run for this asset failed, then calculate the data times you would
                # have expected after that failed run completed
                failed_data_times = instance_queryer.get_failed_data_times_for_key(
                    asset_graph, key, relevant_upstream_keys
                )
                # calculate the data times you'd expect for this key if you were to run it
                expected_data_times = get_expected_data_times_for_key(
                    asset_graph, current_time, expected_data_times_by_key, key
                )

                # figure out a time window that you can execute this asset within to solve a maximum
                # number of constraints
                (
                    execution_window_start,
                    execution_window_end,
                ) = get_execution_time_window_for_constraints(
                    constraints=constraints,
                    current_data_times=current_data_times,
                    in_progress_data_times=in_progress_data_times,
                    failed_data_times=failed_data_times,
                    expected_data_times=expected_data_times,
                    relevant_upstream_keys=relevant_upstream_keys,
                )

                # this key will be updated within the plan window
                if execution_window_end is not None and execution_window_end <= plan_window_end:
                    eventually_materialize.add(AssetKeyPartitionKey(key, None))

            # a key may already be in to_materialize by the time we get here if a required
            # neighbor was selected to be updated
            asset_key_partition_key = AssetKeyPartitionKey(key, None)
            if asset_key_partition_key in to_materialize:
                expected_data_times_by_key[key] = expected_data_times
            elif (
                # this key should be updated on this tick, as we are within the allowable window
                execution_window_start is not None
                and execution_window_start <= current_time
            ):
                to_materialize.add(asset_key_partition_key)
                expected_data_times_by_key[key] = expected_data_times
                # all required neighbors will be updated on the same tick
                for required_key in asset_graph.get_required_multi_asset_keys(key):
                    to_materialize.add(AssetKeyPartitionKey(required_key, None))
            else:
                # if downstream assets consume this, they should expect data times equal to the
                # current times for this asset, as it's not going to be updated
                expected_data_times_by_key[key] = current_data_times

    return to_materialize, eventually_materialize


def reconcile(
    repository_def: RepositoryDefinition,
    asset_selection: AssetSelection,
    instance: "DagsterInstance",
    cursor: AssetReconciliationCursor,
    run_tags: Optional[Mapping[str, str]],
):
    instance_queryer = CachingInstanceQueryer(instance=instance)
    asset_graph = repository_def.asset_graph

    (
        asset_partitions_to_reconcile_for_freshness,
        eventual_asset_partitions_to_reconcile_for_freshness,
    ) = determine_asset_partitions_to_reconcile_for_freshness(
        instance_queryer=instance_queryer,
        asset_graph=asset_graph,
        target_asset_selection=asset_selection,
    )

    (
        asset_partitions_to_reconcile,
        newly_materialized_root_asset_keys,
        newly_materialized_root_partitions_by_asset_key,
        latest_storage_id,
    ) = determine_asset_partitions_to_reconcile(
        instance_queryer=instance_queryer,
        asset_graph=asset_graph,
        cursor=cursor,
        target_asset_selection=asset_selection,
        eventual_asset_partitions_to_reconcile_for_freshness=eventual_asset_partitions_to_reconcile_for_freshness,
    )

    run_requests = build_run_requests(
        asset_partitions_to_reconcile | asset_partitions_to_reconcile_for_freshness,
        asset_graph,
        run_tags,
    )

    return run_requests, cursor.with_updates(
        latest_storage_id=latest_storage_id,
        run_requests=run_requests,
        asset_graph=repository_def.asset_graph,
        newly_materialized_root_asset_keys=newly_materialized_root_asset_keys,
        newly_materialized_root_partitions_by_asset_key=newly_materialized_root_partitions_by_asset_key,
    )


def build_run_requests(
    asset_partitions: Iterable[AssetKeyPartitionKey],
    asset_graph: AssetGraph,
    run_tags: Optional[Mapping[str, str]],
) -> Sequence[RunRequest]:
    assets_to_reconcile_by_partitions_def_partition_key: Mapping[
        Tuple[Optional[PartitionsDefinition], Optional[str]], Set[AssetKey]
    ] = defaultdict(set)

    for asset_partition in asset_partitions:
        assets_to_reconcile_by_partitions_def_partition_key[
            asset_graph.get_partitions_def(asset_partition.asset_key), asset_partition.partition_key
        ].add(asset_partition.asset_key)

    run_requests = []

    for (
        _,
        partition_key,
    ), asset_keys in assets_to_reconcile_by_partitions_def_partition_key.items():
        tags = {**(run_tags or {})}
        if partition_key is not None:
            tags[PARTITION_NAME_TAG] = partition_key

        run_requests.append(
            RunRequest(
                asset_selection=list(asset_keys),
                tags=tags,
            )
        )

    return run_requests


[docs]@experimental def build_asset_reconciliation_sensor( asset_selection: AssetSelection, name: str = "asset_reconciliation_sensor", minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, run_tags: Optional[Mapping[str, str]] = None, ) -> SensorDefinition: r"""Constructs a sensor that will monitor the provided assets and launch materializations to "reconcile" them. An asset is considered "unreconciled" if any of: - This sensor has never tried to materialize it and it has never been materialized. - Any of its parents have been materialized more recently than it has. - Any of its parents are unreconciled. - It is not currently up to date with respect to its freshness policy. The sensor won't try to reconcile any assets before their parents are reconciled. When multiple FreshnessPolicies require data from the same upstream assets, this sensor will attempt to launch a minimal number of runs of that asset to satisfy all constraints. Args: asset_selection (AssetSelection): The group of assets you want to keep up-to-date name (str): The name to give the sensor. minimum_interval_seconds (Optional[int]): The minimum amount of time that should elapse between sensor invocations. description (Optional[str]): A description for the sensor. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. run_tags (Optional[Mapping[str, str]): Dictionary of tags to pass to the RunRequests launched by this sensor Returns: SensorDefinition Example: If you have the following asset graph, with no freshness policies: .. code-block:: python a b c \ / \ / d e \ / f and create the sensor: .. code-block:: python build_asset_reconciliation_sensor( AssetSelection.assets(d, e, f), name="my_reconciliation_sensor", ) You will observe the following behavior: * If ``a``, ``b``, and ``c`` are all materialized, then on the next sensor tick, the sensor will see that ``d`` and ``e`` can be materialized. Since ``d`` and ``e`` will be materialized, ``f`` can also be materialized. The sensor will kick off a run that will materialize ``d``, ``e``, and ``f``. * If, on the next sensor tick, none of ``a``, ``b``, and ``c`` have been materialized again, the sensor will not launch a run. * If, before the next sensor tick, just asset ``a`` and ``b`` have been materialized, the sensor will launch a run to materialize ``d``, ``e``, and ``f``, because they're downstream of ``a`` and ``b``. Even though ``c`` hasn't been materialized, the downstream assets can still be updated, because ``c`` is still considered "reconciled". Example: If you have the following asset graph, with the following freshness policies: * ``c: FreshnessPolicy(maximum_lag_minutes=120, cron_schedule="0 2 \* \* \*")``, meaning that by 2AM, c needs to be materialized with data from a and b that is no more than 120 minutes old (i.e. all of yesterday's data). .. code-block:: python a b \ / c and create the sensor: .. code-block:: python build_asset_reconciliation_sensor( AssetSelection.all(), name="my_reconciliation_sensor", ) Assume that ``c`` currently has incorporated all source data up to ``2022-01-01 23:00``. You will observe the following behavior: * At any time between ``2022-01-02 00:00`` and ``2022-01-02 02:00``, the sensor will see that ``c`` will soon require data from ``2022-01-02 00:00``. In order to satisfy this requirement, there must be a materialization for both ``a`` and ``b`` with time >= ``2022-01-02 00:00``. If such a materialization does not exist for one of those assets, the missing asset(s) will be executed on this tick, to help satisfy the constraint imposed by ``c``. Materializing ``c`` in the same run as those assets will satisfy its required data constraint, and so the sensor will kick off a run for ``c`` alongside whichever upstream assets did not have up-to-date data. * On the next tick, the sensor will see that a run is currently planned which will satisfy that constraint, so no runs will be kicked off. * Once that run completes, a new materialization event will exist for ``c``, which will incorporate all of the required data, so no new runs will be kicked off until the following day. """ check_valid_name(name) check.opt_mapping_param(run_tags, "run_tags", key_type=str, value_type=str) @sensor( name=name, asset_selection=asset_selection, minimum_interval_seconds=minimum_interval_seconds, description=description, default_status=default_status, ) def _sensor(context): cursor = ( AssetReconciliationCursor.from_serialized( context.cursor, context.repository_def.asset_graph ) if context.cursor else AssetReconciliationCursor.empty() ) run_requests, updated_cursor = reconcile( repository_def=context.repository_def, asset_selection=asset_selection, instance=context.instance, cursor=cursor, run_tags=run_tags, ) context.update_cursor(updated_cursor.serialize()) return run_requests return _sensor