import contextlib
import inspect
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Generator,
Generic,
List,
Mapping,
NamedTuple,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin
from dagster import (
Field as DagsterField,
)
from dagster._annotations import deprecated
from dagster._config.field_utils import config_dictionary_from_values
from dagster._config.pythonic_config.typing_utils import (
TypecheckAllowPartialResourceInitParams,
)
from dagster._config.validate import validate_config
from dagster._core.definitions.definition_config_schema import (
ConfiguredDefinitionConfigSchema,
DefinitionConfigSchema,
)
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.execution.context.init import InitResourceContext, build_init_resource_context
from dagster._utils.cached_method import cached_method
from .attach_other_object_to_context import (
IAttachDifferentObjectToOpContext as IAttachDifferentObjectToOpContext,
)
from .pydantic_compat_layer import (
model_fields,
)
try:
from functools import cached_property # type: ignore # (py37 compat)
except ImportError:
class cached_property:
pass
from abc import ABC, abstractmethod
from pydantic import BaseModel
import dagster._check as check
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.resource_definition import (
ResourceDefinition,
ResourceFunction,
ResourceFunctionWithContext,
ResourceFunctionWithoutContext,
has_at_least_one_parameter,
)
from dagster._core.storage.io_manager import IOManagerDefinition
from .config import Config, MakeConfigCacheable, infer_schema_from_config_class
from .conversion_utils import (
TResValue,
_curry_config_schema,
)
from .typing_utils import BaseResourceMeta, LateBoundTypesForResourceTypeChecking
Self = TypeVar("Self", bound="ConfigurableResourceFactory")
ResourceId: TypeAlias = int
class AllowDelayedDependencies:
_nested_partial_resources: Mapping[str, ResourceDefinition] = {}
def _resolve_required_resource_keys(
self, resource_mapping: Mapping[int, str]
) -> AbstractSet[str]:
from dagster._core.execution.build_resources import wrap_resource_for_execution
# All dependent resources which are not fully configured
# must be specified to the Definitions object so that the
# resource can be configured at runtime by the user
nested_partial_resource_keys = {
attr_name: resource_mapping.get(id(resource_def))
for attr_name, resource_def in self._nested_partial_resources.items()
}
check.invariant(
all(pointer_key is not None for pointer_key in nested_partial_resource_keys.values()),
"Any partially configured, nested resources must be provided to Definitions"
f" object: {nested_partial_resource_keys}",
)
# Recursively get all nested resource keys
nested_resource_required_keys: Set[str] = set()
for v in self._nested_partial_resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(v, resource_mapping)
)
resources, _ = separate_resource_params(
cast(Type[BaseModel], self.__class__), self.__dict__
)
for v in resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(
wrap_resource_for_execution(v), resource_mapping
)
)
out = set(cast(Set[str], nested_partial_resource_keys.values())).union(
nested_resource_required_keys
)
return out
class InitResourceContextWithKeyMapping(InitResourceContext):
"""Passes along a mapping from ResourceDefinition id to resource key alongside the
InitResourceContext. This is used to resolve the required resource keys for
resources which may hold nested partial resources.
"""
def __init__(
self,
context: InitResourceContext,
resource_id_to_key_mapping: Mapping[ResourceId, str],
):
super().__init__(
resource_config=context.resource_config,
resources=context.resources,
instance=context.instance,
resource_def=context.resource_def,
dagster_run=context.dagster_run,
log_manager=context.log,
)
self._resource_id_to_key_mapping = resource_id_to_key_mapping
self._resources_by_id = {
resource_id: getattr(context.resources, resource_key, None)
for resource_id, resource_key in resource_id_to_key_mapping.items()
}
@property
def resources_by_id(self) -> Mapping[ResourceId, Any]:
return self._resources_by_id
def replace_config(self, config: Any) -> "InitResourceContext":
return InitResourceContextWithKeyMapping(
super().replace_config(config), self._resource_id_to_key_mapping
)
class ResourceWithKeyMapping(ResourceDefinition):
"""Wrapper around a ResourceDefinition which helps the inner resource resolve its required
resource keys. This is useful for resources which may hold nested resources. At construction
time, they are unaware of the resource keys of their nested resources - the resource id to
key mapping is used to resolve this.
"""
def __init__(
self,
resource: ResourceDefinition,
resource_id_to_key_mapping: Dict[ResourceId, str],
):
self._resource = resource
self._resource_id_to_key_mapping = resource_id_to_key_mapping
ResourceDefinition.__init__(
self,
resource_fn=self.setup_context_resources_and_call,
config_schema=resource.config_schema,
description=resource.description,
version=resource.version,
)
def setup_context_resources_and_call(self, context: InitResourceContext):
"""Wrapper around the wrapped resource's resource_fn which attaches its
resource id to key mapping to the context, and then calls the nested resource's resource_fn.
"""
context_with_key_mapping = InitResourceContextWithKeyMapping(
context, self._resource_id_to_key_mapping
)
if has_at_least_one_parameter(self._resource.resource_fn):
return self._resource.resource_fn(context_with_key_mapping)
else:
return cast(ResourceFunctionWithoutContext, self._resource.resource_fn)()
@property
def required_resource_keys(self) -> AbstractSet[str]:
return _resolve_required_resource_keys_for_resource(
self._resource, self._resource_id_to_key_mapping
)
@property
def wrapped_resource(self) -> ResourceDefinition:
return self._resource
@property
def inner_resource(self):
return self._resource
def attach_resource_id_to_key_mapping(
resource_def: Any, resource_id_to_key_mapping: Dict[ResourceId, str]
) -> Any:
from .io_manager import IOManagerWithKeyMapping
if isinstance(resource_def, (ConfigurableResourceFactory, PartialResource)):
defn = resource_def.get_resource_definition()
return (
IOManagerWithKeyMapping(defn, resource_id_to_key_mapping)
if isinstance(defn, IOManagerDefinition)
else ResourceWithKeyMapping(defn, resource_id_to_key_mapping)
)
return resource_def
CoercibleToResource: TypeAlias = Union[
ResourceDefinition, "ConfigurableResourceFactory", "PartialResource"
]
def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource))
class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies):
def __init__(
self,
configurable_resource_cls: Type,
resource_fn: ResourceFunction,
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, Any],
dagster_maintained: bool = False,
):
super().__init__(
resource_fn=resource_fn,
config_schema=config_schema,
description=description,
)
self._configurable_resource_cls = configurable_resource_cls
self._resolve_resource_keys = resolve_resource_keys
self._nested_resources = nested_resources
self._dagster_maintained = dagster_maintained
@property
def configurable_resource_cls(self) -> Type:
return self._configurable_resource_cls
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._nested_resources
def _resolve_required_resource_keys(
self, resource_mapping: Mapping[int, str]
) -> AbstractSet[str]:
return self._resolve_resource_keys(resource_mapping)
def _is_dagster_maintained(self) -> bool:
return self._dagster_maintained
class ConfigurableResourceFactoryState(NamedTuple):
nested_partial_resources: Mapping[str, Any]
resolved_config_dict: Dict[str, Any]
config_schema: DefinitionConfigSchema
schema: DagsterField
nested_resources: Dict[str, Any]
resource_context: Optional[InitResourceContext]
class ConfigurableResourceFactory(
Generic[TResValue],
Config,
TypecheckAllowPartialResourceInitParams,
AllowDelayedDependencies,
ABC,
metaclass=BaseResourceMeta,
):
"""Base class for creating and managing the lifecycle of Dagster resources that utilize structured config.
Users should directly inherit from this class when they want the object passed to user-defined
code (such as an asset or op) to be different than the object that defines the configuration
schema and is passed to the :py:class:`Definitions` object. Cases where this is useful include is
when the object passed to user code is:
* An existing class from a third-party library that the user does not control.
* A complex class that requires substantial internal state management or itself requires arguments beyond its config values.
* A class with expensive initialization that should not be invoked on code location load, but rather lazily on first use in an op or asset during a run.
* A class that you desire to be a plain Python class, rather than a Pydantic class, for whatever reason.
This class is a subclass of both :py:class:`ResourceDefinition` and :py:class:`Config`, and
must implement ``create_resource``, which creates the resource to pass to user code.
Example definition:
.. code-block:: python
class DatabaseResource(ConfigurableResourceFactory[Database]):
connection_uri: str
def create_resource(self, _init_context) -> Database:
# For example Database could be from a third-party library or require expensive setup.
# Or you could just prefer to separate the concerns of configuration and runtime representation
return Database(self.connection_uri)
To use a resource created by a factory in a job, you must use the Resource type annotation.
Example usage:
.. code-block:: python
@asset
def asset_that_uses_database(database: ResourceParam[Database]):
# Database used directly in user code
database.query("SELECT * FROM table")
defs = Definitions(
assets=[asset_that_uses_database],
resources={"database": DatabaseResource(connection_uri="some_uri")},
)
"""
def __init__(self, **data: Any):
resource_pointers, data_without_resources = separate_resource_params(self.__class__, data)
schema = infer_schema_from_config_class(
self.__class__, fields_to_omit=set(resource_pointers.keys())
)
# Populate config values
Config.__init__(self, **{**data_without_resources, **resource_pointers})
# We pull the values from the Pydantic config object, which may cast values
# to the correct type under the hood - useful in particular for enums
casted_data_without_resources = {
k: v
for k, v in self._convert_to_config_dictionary().items()
if k in data_without_resources
}
resolved_config_dict = config_dictionary_from_values(casted_data_without_resources, schema)
self._state__internal__ = ConfigurableResourceFactoryState(
# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
nested_partial_resources={
k: v for k, v in resource_pointers.items() if (not _is_fully_configured(v))
},
resolved_config_dict=resolved_config_dict,
# These are unfortunately named very similarily
config_schema=_curry_config_schema(schema, resolved_config_dict),
schema=schema,
nested_resources={k: v for k, v in resource_pointers.items()},
resource_context=None,
)
@property
def _schema(self):
return self._state__internal__.schema
@property
def _config_schema(self):
return self._state__internal__.config_schema
@property
def _nested_partial_resources(self):
return self._state__internal__.nested_partial_resources
@property
def _nested_resources(self):
return self._state__internal__.nested_resources
@property
def _resolved_config_dict(self):
return self._state__internal__.resolved_config_dict
@classmethod
def _is_dagster_maintained(cls) -> bool:
"""This should be overridden to return True by all dagster maintained resources and IO managers."""
return False
@classmethod
def _is_cm_resource_cls(cls: Type["ConfigurableResourceFactory"]) -> bool:
return (
cls.yield_for_execution != ConfigurableResourceFactory.yield_for_execution
or cls.teardown_after_execution != ConfigurableResourceFactory.teardown_after_execution
)
@property
def _is_cm_resource(self) -> bool:
return self.__class__._is_cm_resource_cls() # noqa: SLF001
def _get_initialize_and_run_fn(self) -> Callable:
return self._initialize_and_run_cm if self._is_cm_resource else self._initialize_and_run
@cached_method
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.__class__,
resource_fn=self._get_initialize_and_run_fn(),
config_schema=self._config_schema,
description=self.__doc__,
resolve_resource_keys=self._resolve_required_resource_keys,
nested_resources=self.nested_resources,
dagster_maintained=self._is_dagster_maintained(),
)
@abstractmethod
def create_resource(self, context: InitResourceContext) -> TResValue:
"""Returns the object that this resource hands to user code, accessible by ops or assets
through the context or resource parameters. This works like the function decorated
with @resource when using function-based resources.
"""
raise NotImplementedError()
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._nested_resources
@classmethod
def configure_at_launch(cls: "Type[Self]", **kwargs) -> "PartialResource[Self]":
"""Returns a partially initialized copy of the resource, with remaining config fields
set at runtime.
"""
return PartialResource(cls, data=kwargs)
def _with_updated_values(
self, values: Optional[Mapping[str, Any]]
) -> "ConfigurableResourceFactory[TResValue]":
"""Returns a new instance of the resource with the given values.
Used when initializing a resource at runtime.
"""
values = check.opt_mapping_param(values, "values", key_type=str)
# Since Resource extends BaseModel and is a dataclass, we know that the
# signature of any __init__ method will always consist of the fields
# of this class. We can therefore safely pass in the values as kwargs.
out = self.__class__(**{**self._get_non_none_public_field_values(), **values})
out._state__internal__ = out._state__internal__._replace( # noqa: SLF001
resource_context=self._state__internal__.resource_context
)
return out
@contextlib.contextmanager
def _resolve_and_update_nested_resources(
self, context: InitResourceContext
) -> Generator["ConfigurableResourceFactory[TResValue]", None, None]:
"""Updates any nested resources with the resource values from the context.
In this case, populating partially configured resources or
resources that return plain Python types.
Returns a new instance of the resource.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution
partial_resources_to_update: Dict[str, Any] = {}
if self._nested_partial_resources:
context_with_mapping = cast(
InitResourceContextWithKeyMapping,
check.inst(
context,
InitResourceContextWithKeyMapping,
"This ConfiguredResource contains unresolved partially-specified nested"
" resources, and so can only be initialized using a"
" InitResourceContextWithKeyMapping",
),
)
partial_resources_to_update = {
attr_name: context_with_mapping.resources_by_id[id(resource)]
for attr_name, resource in self._nested_partial_resources.items()
}
# Also evaluate any resources that are not partial
with contextlib.ExitStack() as stack:
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, wrap_resource_for_execution(resource), context
)
for attr_name, resource in resources_to_update.items()
if attr_name not in partial_resources_to_update
}
to_update = {**resources_to_update, **partial_resources_to_update}
yield self._with_updated_values(to_update)
@deprecated(
breaking_version="2.0", additional_warn_text="Use `with_replaced_resource_context` instead"
)
def with_resource_context(
self, resource_context: InitResourceContext
) -> "ConfigurableResourceFactory[TResValue]":
return self.with_replaced_resource_context(resource_context)
def with_replaced_resource_context(
self, resource_context: InitResourceContext
) -> "ConfigurableResourceFactory[TResValue]":
"""Returns a new instance of the resource with the given resource init context bound."""
# This utility is used to create a copy of this resource, without adjusting
# any values in this case
copy = self._with_updated_values({})
copy._state__internal__ = copy._state__internal__._replace( # noqa: SLF001
resource_context=resource_context
)
return copy
def _initialize_and_run(self, context: InitResourceContext) -> TResValue:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)
updated_resource.setup_for_execution(context)
return updated_resource.create_resource(context)
@contextlib.contextmanager
def _initialize_and_run_cm(
self, context: InitResourceContext
) -> Generator[TResValue, None, None]:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)
with updated_resource.yield_for_execution(context) as value:
yield value
def setup_for_execution(self, context: InitResourceContext) -> None:
"""Optionally override this method to perform any pre-execution steps
needed before the resource is used in execution.
"""
pass
def teardown_after_execution(self, context: InitResourceContext) -> None:
"""Optionally override this method to perform any post-execution steps
needed after the resource is used in execution.
teardown_after_execution will be called even if any part of the run fails.
It will not be called if setup_for_execution fails.
"""
pass
@contextlib.contextmanager
def yield_for_execution(self, context: InitResourceContext) -> Generator[TResValue, None, None]:
"""Optionally override this method to perform any lifecycle steps
before or after the resource is used in execution. By default, calls
setup_for_execution before yielding, and teardown_after_execution after yielding.
Note that if you override this method and want setup_for_execution or
teardown_after_execution to be called, you must invoke them yourself.
"""
self.setup_for_execution(context)
try:
yield self.create_resource(context)
finally:
self.teardown_after_execution(context)
def get_resource_context(self) -> InitResourceContext:
"""Returns the context that this resource was initialized with."""
return check.not_none(
self._state__internal__.resource_context,
additional_message="Attempted to get context before resource was initialized.",
)
def process_config_and_initialize(self) -> TResValue:
"""Initializes this resource, fully processing its config and returning the prepared
resource value.
"""
from dagster._config.post_process import post_process_config
return self.from_resource_context(
build_init_resource_context(
config=post_process_config(
self._config_schema.config_type, self._convert_to_config_dictionary()
).value
)
)
@classmethod
def from_resource_context(cls, context: InitResourceContext) -> TResValue:
"""Creates a new instance of this resource from a populated InitResourceContext.
Useful when creating a resource from a function-based resource, for backwards
compatibility purposes.
For resources that have custom teardown behavior, use from_resource_context_cm instead.
Example usage:
.. code-block:: python
class MyResource(ConfigurableResource):
my_str: str
@resource(config_schema=MyResource.to_config_schema())
def my_resource(context: InitResourceContext) -> MyResource:
return MyResource.from_resource_context(context)
"""
check.invariant(
not cls._is_cm_resource_cls(),
"Use from_resource_context_cm for resources which have custom teardown behavior,"
" e.g. overriding yield_for_execution or teardown_after_execution",
)
return cls(**context.resource_config or {})._initialize_and_run(context) # noqa: SLF001
@classmethod
@contextlib.contextmanager
def from_resource_context_cm(
cls, context: InitResourceContext
) -> Generator[TResValue, None, None]:
"""Context which generates a new instance of this resource from a populated InitResourceContext.
Useful when creating a resource from a function-based resource, for backwards
compatibility purposes. Handles custom teardown behavior.
Example usage:
.. code-block:: python
class MyResource(ConfigurableResource):
my_str: str
@resource(config_schema=MyResource.to_config_schema())
def my_resource(context: InitResourceContext) -> Generator[MyResource, None, None]:
with MyResource.from_resource_context_cm(context) as my_resource:
yield my_resource
"""
with cls(**context.resource_config or {})._initialize_and_run_cm( # noqa: SLF001
context
) as value:
yield value
[docs]class ConfigurableResource(ConfigurableResourceFactory[TResValue]):
"""Base class for Dagster resources that utilize structured config.
This class is a subclass of both :py:class:`ResourceDefinition` and :py:class:`Config`.
Example definition:
.. code-block:: python
class WriterResource(ConfigurableResource):
prefix: str
def output(self, text: str) -> None:
print(f"{self.prefix}{text}")
Example usage:
.. code-block:: python
@asset
def asset_that_uses_writer(writer: WriterResource):
writer.output("text")
defs = Definitions(
assets=[asset_that_uses_writer],
resources={"writer": WriterResource(prefix="a_prefix")},
)
"""
def create_resource(self, context: InitResourceContext) -> TResValue:
"""Returns the object that this resource hands to user code, accessible by ops or assets
through the context or resource parameters. This works like the function decorated
with @resource when using function-based resources.
For ConfigurableResource, this function will return itself, passing
the actual ConfigurableResource object to user code.
"""
return cast(TResValue, self)
def _is_fully_configured(resource: CoercibleToResource) -> bool:
from dagster._core.execution.build_resources import wrap_resource_for_execution
actual_resource = wrap_resource_for_execution(resource)
res = (
validate_config(
actual_resource.config_schema.config_type,
(
actual_resource.config_schema.default_value
if actual_resource.config_schema.default_provided
else {}
),
).success
is True
)
return res
class PartialResourceState(NamedTuple):
nested_partial_resources: Dict[str, Any]
config_schema: DagsterField
resource_fn: Callable[[InitResourceContext], Any]
description: Optional[str]
nested_resources: Dict[str, Any]
class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable):
data: Dict[str, Any]
resource_cls: Type[ConfigurableResourceFactory[TResValue]]
def __init__(
self,
resource_cls: Type[ConfigurableResourceFactory[TResValue]],
data: Dict[str, Any],
):
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)
MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs
def resource_fn(context: InitResourceContext):
instantiated = resource_cls(
**{**data, **context.resource_config}
) # So that collisions are resolved in favor of the latest provided run config
return instantiated._get_initialize_and_run_fn()(context) # noqa: SLF001
self._state__internal__ = PartialResourceState(
# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
nested_partial_resources={
k: v for k, v in resource_pointers.items() if (not _is_fully_configured(v))
},
config_schema=infer_schema_from_config_class(
resource_cls, fields_to_omit=set(resource_pointers.keys())
),
resource_fn=resource_fn,
description=resource_cls.__doc__,
nested_resources={k: v for k, v in resource_pointers.items()},
)
# to make AllowDelayedDependencies work
@property
def _nested_partial_resources(
self,
) -> Mapping[str, Any]:
return self._state__internal__.nested_partial_resources
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._state__internal__.nested_resources
@cached_method
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.resource_cls,
resource_fn=self._state__internal__.resource_fn,
config_schema=self._state__internal__.config_schema,
description=self._state__internal__.description,
resolve_resource_keys=self._resolve_required_resource_keys,
nested_resources=self.nested_resources,
dagster_maintained=self.resource_cls._is_dagster_maintained(), # noqa: SLF001
)
ResourceOrPartial: TypeAlias = Union[
ConfigurableResourceFactory[TResValue], PartialResource[TResValue]
]
ResourceOrPartialOrValue: TypeAlias = Union[
ConfigurableResourceFactory[TResValue],
PartialResource[TResValue],
ResourceDefinition,
TResValue,
]
V = TypeVar("V")
class ResourceDependency(Generic[V]):
def __set_name__(self, _owner, name):
self._name = name
def __get__(self, obj: "ConfigurableResourceFactory", __owner: Any) -> V:
return getattr(obj, self._name)
def __set__(self, obj: Optional[object], value: ResourceOrPartialOrValue[V]) -> None:
setattr(obj, self._name, value)
class ConfigurableLegacyResourceAdapter(ConfigurableResource, ABC):
"""Adapter base class for wrapping a decorated, function-style resource
with structured config.
To use this class, subclass it, define config schema fields using Pydantic,
and implement the ``wrapped_resource`` method.
Example:
.. code-block:: python
@resource(config_schema={"prefix": str})
def writer_resource(context):
prefix = context.resource_config["prefix"]
def output(text: str) -> None:
out_txt.append(f"{prefix}{text}")
return output
class WriterResource(ConfigurableLegacyResourceAdapter):
prefix: str
@property
def wrapped_resource(self) -> ResourceDefinition:
return writer_resource
"""
@property
@abstractmethod
def wrapped_resource(self) -> ResourceDefinition:
raise NotImplementedError()
@cached_method
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.__class__,
resource_fn=self.wrapped_resource.resource_fn,
config_schema=self._config_schema,
description=self.__doc__,
resolve_resource_keys=self._resolve_required_resource_keys,
nested_resources=self.nested_resources,
dagster_maintained=self._is_dagster_maintained(),
)
def __call__(self, *args, **kwargs):
return self.wrapped_resource(*args, **kwargs)
class SeparatedResourceParams(NamedTuple):
resources: Dict[str, Any]
non_resources: Dict[str, Any]
def _is_annotated_as_resource_type(annotation: Type, metadata: List[str]) -> bool:
"""Determines if a field in a structured config class is annotated as a resource type or not."""
from .type_check_utils import safe_is_subclass
if metadata and metadata[0] == "resource_dependency":
return True
is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr(
annotation, "__metadata__", None
) == ("resource_dependency",)
return is_annotated_as_resource_dependency or safe_is_subclass(
annotation, (ResourceDefinition, ConfigurableResourceFactory)
)
class ResourceDataWithAnnotation(NamedTuple):
key: str
value: Any
annotation: Type
annotation_metadata: List[str]
def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}
data_with_annotation: List[ResourceDataWithAnnotation] = [
# No longer exists in Pydantic 2.x, will need to be updated when we upgrade
ResourceDataWithAnnotation(
key=field_name,
value=field_value,
annotation=fields_by_resolved_field_name[field_name].annotation,
annotation_metadata=fields_by_resolved_field_name[field_name].metadata,
)
for field_name, field_value in data.items()
if field_name in fields_by_resolved_field_name
]
# We need to grab metadata from the annotation in order to tell if
# this key was annotated with a typing.Annotated annotation (which we use for resource/resource deps),
# since Pydantic 2.0 strips that info out and sticks any Annotated metadata in the
# metadata field
out = SeparatedResourceParams(
resources={
d.key: d.value
for d in data_with_annotation
if _is_annotated_as_resource_type(
d.annotation,
d.annotation_metadata,
)
},
non_resources={
d.key: d.value
for d in data_with_annotation
if not _is_annotated_as_resource_type(
d.annotation,
d.annotation_metadata,
)
},
)
return out
def _call_resource_fn_with_default(
stack: contextlib.ExitStack, obj: ResourceDefinition, context: InitResourceContext
) -> Any:
from dagster._config.validate import process_config
if isinstance(obj.config_schema, ConfiguredDefinitionConfigSchema):
value = cast(Dict[str, Any], obj.config_schema.resolve_config({}).value)
context = context.replace_config(value["config"])
elif obj.config_schema.default_provided:
# To explain why we need to process config here;
# - The resource available on the init context (context.resource_config) has already been processed
# - The nested resource's config has also already been processed, but is only available in the broader run config dictionary.
# - The only information we have access to here is the unprocessed default value, so we need to process it a second time.
unprocessed_config = obj.config_schema.default_value
evr = process_config(
{"config": obj.config_schema.config_type}, {"config": unprocessed_config}
)
if not evr.success:
raise DagsterInvalidConfigError(
"Error in config for nested resource ",
evr.errors,
unprocessed_config,
)
context = context.replace_config(cast(dict, evr.value)["config"])
if has_at_least_one_parameter(obj.resource_fn):
result = cast(ResourceFunctionWithContext, obj.resource_fn)(context)
else:
result = cast(ResourceFunctionWithoutContext, obj.resource_fn)()
is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if is_fn_generator:
return stack.enter_context(cast(contextlib.AbstractContextManager, result))
else:
return result
LateBoundTypesForResourceTypeChecking.set_actual_types_for_type_checking(
resource_dep_type=ResourceDependency,
resource_type=ConfigurableResourceFactory,
partial_resource_type=PartialResource,
)
def validate_resource_annotated_function(fn) -> None:
"""Validates any parameters on the decorated function that are annotated with
:py:class:`dagster.ResourceDefinition`, raising a :py:class:`dagster.DagsterInvalidDefinitionError`
if any are not also instances of :py:class:`dagster.ConfigurableResource` (these resources should
instead be wrapped in the :py:func:`dagster.Resource` Annotation).
"""
from dagster import DagsterInvalidDefinitionError
from dagster._config.pythonic_config.resource import (
ConfigurableResource,
ConfigurableResourceFactory,
TResValue,
)
from .type_check_utils import safe_is_subclass
malformed_params = [
param
for param in get_function_params(fn)
if safe_is_subclass(param.annotation, (ResourceDefinition, ConfigurableResourceFactory))
and not safe_is_subclass(param.annotation, ConfigurableResource)
]
if len(malformed_params) > 0:
malformed_param = malformed_params[0]
output_type = None
if safe_is_subclass(malformed_param.annotation, ConfigurableResourceFactory):
orig_bases = getattr(malformed_param.annotation, "__orig_bases__", None)
output_type = get_args(orig_bases[0])[0] if orig_bases and len(orig_bases) > 0 else None
if output_type == TResValue:
output_type = None
output_type_name = getattr(output_type, "__name__", str(output_type))
raise DagsterInvalidDefinitionError(
"""Resource param '{param_name}' is annotated as '{annotation_type}', but '{annotation_type}' outputs {value_message} value to user code such as @ops and @assets. This annotation should instead be {annotation_suggestion}""".format(
param_name=malformed_param.name,
annotation_type=malformed_param.annotation,
value_message=f"a '{output_type}'" if output_type else "an unknown",
annotation_suggestion=(
f"'ResourceParam[{output_type_name}]'"
if output_type
else "'ResourceParam[Any]' or 'ResourceParam[<output type>]'"
),
)
)
def _resolve_required_resource_keys_for_resource(
resource: ResourceDefinition, resource_id_to_key_mapping: Mapping[ResourceId, str]
) -> AbstractSet[str]:
"""Gets the required resource keys for the provided resource, with the assistance of the passed
resource-id-to-key mapping. For resources which may hold nested partial resources,
this mapping is used to obtain the top-level resource keys to depend on.
"""
if isinstance(resource, AllowDelayedDependencies):
return resource._resolve_required_resource_keys(resource_id_to_key_mapping) # noqa: SLF001
return resource.required_resource_keys