Source code for dagster._core.definitions.version_strategy

import hashlib
import inspect
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, NamedTuple, Optional

from dagster._annotations import public

if TYPE_CHECKING:
    from .op_definition import OpDefinition
    from .resource_definition import ResourceDefinition


[docs]class OpVersionContext(NamedTuple): """Provides execution-time information for computing the version for an op. Attributes: op_def (OpDefinition): The definition of the op to compute a version for. op_config (Any): The parsed config to be passed to the op during execution. """ op_def: "OpDefinition" op_config: Any @property def solid_def(self) -> "OpDefinition": return self.op_def @property def solid_config(self) -> Any: return self.op_config
[docs]class ResourceVersionContext(NamedTuple): """Provides execution-time information for computing the version for a resource. Attributes: resource_def (ResourceDefinition): The definition of the resource whose version will be computed. resource_config (Any): The parsed config to be passed to the resource during execution. """ resource_def: "ResourceDefinition" resource_config: Any
[docs]class VersionStrategy(ABC): """Abstract class for defining a strategy to version ops and resources. When subclassing, `get_op_version` must be implemented, and `get_resource_version` can be optionally implemented. `get_op_version` should ingest an OpVersionContext, and `get_resource_version` should ingest a ResourceVersionContext. From that, each synthesize a unique string called a `version`, which will be tagged to outputs of that solid in the pipeline. Providing a `VersionStrategy` instance to a job will enable memoization on that job, such that only steps whose outputs do not have an up-to-date version will run. """ @public @abstractmethod def get_op_version(self, context: OpVersionContext) -> str: raise NotImplementedError() @public def get_resource_version( self, context: ResourceVersionContext # pylint: disable=unused-argument ) -> Optional[str]: return None
[docs]class SourceHashVersionStrategy(VersionStrategy): """VersionStrategy that checks for changes to the source code of ops and resources. Only checks for changes within the immediate body of the op/resource's decorated function (or compute function, if the op/resource was constructed directly from a definition). """ def _get_source_hash(self, fn): code_as_str = inspect.getsource(fn) return hashlib.sha1(code_as_str.encode("utf-8")).hexdigest() @public def get_op_version(self, context: OpVersionContext) -> str: compute_fn = context.op_def.compute_fn if callable(compute_fn): return self._get_source_hash(compute_fn) else: return self._get_source_hash(compute_fn.decorated_fn) @public def get_resource_version(self, context: ResourceVersionContext) -> Optional[str]: return self._get_source_hash(context.resource_def.resource_fn)