import atexit
import contextlib
import os
import shutil
import subprocess
import sys
import uuid
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import (
Any,
Dict,
Iterator,
List,
Mapping,
Optional,
Union,
)
import dateutil.parser
import orjson
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetObservation,
AssetsDefinition,
ConfigurableResource,
Output,
get_dagster_logger,
)
from dagster._annotations import public
from dagster._core.errors import DagsterInvalidPropertyError
from dagster._core.execution.context.compute import OpExecutionContext
from dbt.contracts.results import NodeStatus, TestStatus
from dbt.node_types import NodeType
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, root_validator, validator
from typing_extensions import Literal
from ..asset_utils import (
get_manifest_and_translator_from_dbt_assets,
output_name_fn,
)
from ..dagster_dbt_translator import DagsterDbtTranslator
from ..dbt_manifest import DbtManifestParam, validate_manifest
from ..errors import DagsterDbtCliRuntimeError
from ..utils import ASSET_RESOURCE_TYPES, get_dbt_resource_props_by_dbt_unique_id_from_manifest
logger = get_dagster_logger()
DBT_PROJECT_YML_NAME = "dbt_project.yml"
DBT_PROFILES_YML_NAME = "profiles.yml"
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
def _get_dbt_target_path() -> Path:
return Path(os.getenv("DBT_TARGET_PATH", "target"))
[docs]@dataclass
class DbtCliEventMessage:
"""The representation of a dbt CLI event.
Args:
raw_event (Dict[str, Any]): The raw event dictionary.
See https://docs.getdbt.com/reference/events-logging#structured-logging for more
information.
"""
raw_event: Dict[str, Any]
@classmethod
def from_log(cls, log: str) -> "DbtCliEventMessage":
"""Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging.
We assume that the log format is json.
"""
raw_event: Dict[str, Any] = orjson.loads(log)
return cls(raw_event=raw_event)
def __str__(self) -> str:
return self.raw_event["info"]["msg"]
[docs] @public
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
) -> Iterator[Union[Output, AssetObservation, AssetCheckResult]]:
"""Convert a dbt CLI event to a set of corresponding Dagster events.
Args:
manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
linking dbt nodes to Dagster assets.
Returns:
Iterator[Union[Output, AssetObservation, AssetCheckResult]]: A set of corresponding Dagster events.
- Output for refables (e.g. models, seeds, snapshots.)
- AssetObservation for dbt test results that are not enabled as asset checks.
- AssetCheckResult for dbt test results that are enabled as asset checks.
"""
if self.raw_event["info"]["level"] == "debug":
return
event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
if not event_node_info:
return
manifest = validate_manifest(manifest)
if not manifest:
logger.info(
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
)
invocation_id: str = self.raw_event["info"]["invocation_id"]
unique_id: str = event_node_info["unique_id"]
node_resource_type: str = event_node_info["resource_type"]
node_status: str = event_node_info["node_status"]
is_node_successful = node_status == NodeStatus.Success
is_node_finished = bool(event_node_info.get("node_finished_at"))
if node_resource_type in NodeType.refable() and is_node_successful:
started_at = dateutil.parser.isoparse(event_node_info["node_started_at"])
finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
duration_seconds = (finished_at - started_at).total_seconds()
yield Output(
value=None,
output_name=output_name_fn(event_node_info),
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
"Execution Duration": duration_seconds,
},
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
test_resource_props = manifest["nodes"][unique_id]
metadata = {
"unique_id": unique_id,
"invocation_id": invocation_id,
"status": node_status,
}
is_asset_check = dagster_dbt_translator.settings.enable_asset_checks
attached_node_unique_id = test_resource_props.get("attached_node")
is_generic_test = bool(attached_node_unique_id)
if is_asset_check and is_generic_test:
is_test_successful = node_status == TestStatus.Pass
severity = AssetCheckSeverity(test_resource_props["config"]["severity"].upper())
attached_node_resource_props: Dict[str, Any] = manifest["nodes"].get(
attached_node_unique_id
) or manifest["sources"].get(attached_node_unique_id)
attached_node_asset_key = dagster_dbt_translator.get_asset_key(
attached_node_resource_props
)
yield AssetCheckResult(
passed=is_test_successful,
asset_key=attached_node_asset_key,
check_name=event_node_info["node_name"],
metadata=metadata,
severity=severity,
)
else:
for upstream_unique_id in upstream_unique_ids:
upstream_resource_props: Dict[str, Any] = manifest["nodes"].get(
upstream_unique_id
) or manifest["sources"].get(upstream_unique_id)
upstream_asset_key = dagster_dbt_translator.get_asset_key(
upstream_resource_props
)
yield AssetObservation(
asset_key=upstream_asset_key,
metadata=metadata,
)
[docs]@dataclass
class DbtCliInvocation:
"""The representation of an invoked dbt command.
Args:
process (subprocess.Popen): The process running the dbt command.
manifest (Mapping[str, Any]): The dbt manifest blob.
project_dir (Path): The path to the dbt project.
target_path (Path): The path to the dbt target folder.
raise_on_error (bool): Whether to raise an exception if the dbt command fails.
"""
process: subprocess.Popen
manifest: Mapping[str, Any]
dagster_dbt_translator: DagsterDbtTranslator
project_dir: Path
target_path: Path
raise_on_error: bool
@classmethod
def run(
cls,
args: List[str],
env: Dict[str, str],
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator,
project_dir: Path,
target_path: Path,
raise_on_error: bool,
) -> "DbtCliInvocation":
# Attempt to take advantage of partial parsing. If there is a `partial_parse.msgpack` in
# in the target folder, then copy it to the dynamic target path.
#
# This effectively allows us to skip the parsing of the manifest, which can be expensive.
# See https://docs.getdbt.com/reference/programmatic-invocations#reusing-objects for more
# details.
current_target_path = _get_dbt_target_path()
partial_parse_file_path = (
current_target_path.joinpath(PARTIAL_PARSE_FILE_NAME)
if current_target_path.is_absolute()
else project_dir.joinpath(current_target_path, PARTIAL_PARSE_FILE_NAME)
)
partial_parse_destination_target_path = target_path.joinpath(PARTIAL_PARSE_FILE_NAME)
if partial_parse_file_path.exists():
logger.info(
f"Copying `{partial_parse_file_path}` to `{partial_parse_destination_target_path}`"
" to take advantage of partial parsing."
)
partial_parse_destination_target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(partial_parse_file_path, partial_parse_destination_target_path)
# Create a subprocess that runs the dbt CLI command.
logger.info(f"Running dbt command: `{' '.join(args)}`.")
process = subprocess.Popen(
args=args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
cwd=project_dir,
)
# Add handler to terminate child process if running.
# See https://stackoverflow.com/a/18258391 for more details.
def cleanup_dbt_subprocess(process: subprocess.Popen) -> None:
if process.returncode is None:
logger.info(
"The main process is being terminated, but the dbt command has not yet"
" completed. Terminating the execution of dbt command."
)
process.terminate()
process.wait()
atexit.register(cleanup_dbt_subprocess, process)
return cls(
process=process,
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
project_dir=project_dir,
target_path=target_path,
raise_on_error=raise_on_error,
)
[docs] @public
def wait(self) -> "DbtCliInvocation":
"""Wait for the dbt CLI process to complete.
Returns:
DbtCliInvocation: The current representation of the dbt CLI invocation.
Examples:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"]).wait()
"""
list(self.stream_raw_events())
return self
[docs] @public
def is_successful(self) -> bool:
"""Return whether the dbt CLI process completed successfully.
Returns:
bool: True, if the dbt CLI process returns with a zero exit code, and False otherwise.
Examples:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"], raise_on_error=False)
if dbt_cli_invocation.is_successful():
...
"""
return self.process.wait() == 0
[docs] @public
def stream(self) -> Iterator[Union[Output, AssetObservation, AssetCheckResult]]:
"""Stream the events from the dbt CLI process and convert them to Dagster events.
Returns:
Iterator[Union[Output, AssetObservation, AssetCheckResult]]: A set of corresponding Dagster events.
- Output for refables (e.g. models, seeds, snapshots.)
- AssetObservation for dbt test results that are not enabled as asset checks.
- AssetCheckResult for dbt test results that are enabled as asset checks.
Examples:
.. code-block:: python
from pathlib import Path
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
"""
for event in self.stream_raw_events():
yield from event.to_default_asset_events(
manifest=self.manifest, dagster_dbt_translator=self.dagster_dbt_translator
)
[docs] @public
def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
"""Stream the events from the dbt CLI process.
Returns:
Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process.
"""
with self.process.stdout or contextlib.nullcontext():
for raw_line in self.process.stdout or []:
log: str = raw_line.decode().strip()
try:
event = DbtCliEventMessage.from_log(log=log)
# Re-emit the logs from dbt CLI process into stdout.
sys.stdout.write(str(event) + "\n")
sys.stdout.flush()
yield event
except:
# If we can't parse the log, then just emit it as a raw log.
sys.stdout.write(log + "\n")
sys.stdout.flush()
# Ensure that the dbt CLI process has completed.
self._raise_on_error()
[docs] @public
def get_artifact(
self,
artifact: Union[
Literal["manifest.json"],
Literal["catalog.json"],
Literal["run_results.json"],
Literal["sources.json"],
],
) -> Dict[str, Any]:
"""Retrieve a dbt artifact from the target path.
See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information.
Args:
artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]): The name of the artifact to retrieve.
Returns:
Dict[str, Any]: The artifact as a dictionary.
Examples:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"]).wait()
# Retrieve the run_results.json artifact.
run_results = dbt_cli_invocation.get_artifact("run_results.json")
"""
artifact_path = self.target_path.joinpath(artifact)
return orjson.loads(artifact_path.read_bytes())
def _raise_on_error(self) -> None:
"""Ensure that the dbt CLI process has completed. If the process has not successfully
completed, then optionally raise an error.
"""
if not self.is_successful() and self.raise_on_error:
raise DagsterDbtCliRuntimeError(
description=(
f"The dbt CLI process failed with exit code {self.process.returncode}. Check"
" the Dagster compute logs for the full information about the error, or view"
f" the dbt debug log file: {self.target_path.joinpath('dbt.log')}."
)
)
[docs]class DbtCliResource(ConfigurableResource):
"""A resource used to execute dbt CLI commands.
Attributes:
project_dir (str): The path to the dbt project directory. This directory should contain a
`dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more
information.
global_config_flags (List[str]): A list of global flags configuration to pass to the dbt CLI
invocation. See https://docs.getdbt.com/reference/global-configs for a full list of
configuration.
profiles_dir (Optional[str]): The path to the directory containing your dbt `profiles.yml`.
By default, the current working directory is used, which is the dbt project directory.
See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more
information.
profile (Optional[str]): The profile from your dbt `profiles.yml` to use for execution. See
https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more
information.
target (Optional[str]): The target from your dbt `profiles.yml` to use for execution. See
https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more
information.
Examples:
Creating a dbt resource with only a reference to ``project_dir``:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
Creating a dbt resource with a custom ``profiles_dir``:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/dbt/project/profiles",
)
Creating a dbt resource with a custom ``profile`` and ``target``:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/dbt/project/profiles",
profile="jaffle_shop",
target="dev",
)
Creating a dbt resource with global configs, e.g. disabling colored logs with ``--no-use-color``:
.. code-block:: python
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
global_config_flags=["--no-use-color"],
)
"""
project_dir: str = Field(
...,
description=(
"The path to your dbt project directory. This directory should contain a"
" `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more"
" information."
),
)
global_config_flags: List[str] = Field(
default=[],
description=(
"A list of global flags configuration to pass to the dbt CLI invocation. See"
" https://docs.getdbt.com/reference/global-configs for a full list of configuration."
),
)
profiles_dir: Optional[str] = Field(
default=None,
description=(
"The path to the directory containing your dbt `profiles.yml`. By default, the current"
" working directory is used, which is the dbt project directory."
" See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for "
" more information."
),
)
profile: Optional[str] = Field(
default=None,
description=(
"The profile from your dbt `profiles.yml` to use for execution. See"
" https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more"
" information."
),
)
target: Optional[str] = Field(
default=None,
description=(
"The target from your dbt `profiles.yml` to use for execution. See"
" https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more"
" information."
),
)
@classmethod
def _validate_absolute_path_exists(cls, path: Union[str, Path]) -> Path:
absolute_path = Path(path).absolute()
try:
resolved_path = absolute_path.resolve(strict=True)
except FileNotFoundError:
raise ValueError(f"The absolute path of '{path}' ('{absolute_path}') does not exist")
return resolved_path
@classmethod
def _validate_path_contains_file(cls, path: Path, file_name: str, error_message: str):
if not path.joinpath(file_name).exists():
raise ValueError(error_message)
@validator("project_dir", "profiles_dir", pre=True)
def convert_path_to_str(cls, v: Any) -> Any:
"""Validate that the path is converted to a string."""
if isinstance(v, Path):
resolved_path = cls._validate_absolute_path_exists(v)
absolute_path = Path(v).absolute()
try:
resolved_path = absolute_path.resolve(strict=True)
except FileNotFoundError:
raise ValueError(f"The absolute path of '{v}' ('{absolute_path}') does not exist")
return os.fspath(resolved_path)
return v
@validator("project_dir")
def validate_project_dir(cls, project_dir: str) -> str:
resolved_project_dir = cls._validate_absolute_path_exists(project_dir)
cls._validate_path_contains_file(
path=resolved_project_dir,
file_name=DBT_PROJECT_YML_NAME,
error_message=(
f"{resolved_project_dir} does not contain a {DBT_PROJECT_YML_NAME} file. Please"
" specify a valid path to a dbt project."
),
)
return os.fspath(resolved_project_dir)
@validator("profiles_dir")
def validate_profiles_dir(cls, profiles_dir: str) -> str:
resolved_project_dir = cls._validate_absolute_path_exists(profiles_dir)
cls._validate_path_contains_file(
path=resolved_project_dir,
file_name=DBT_PROFILES_YML_NAME,
error_message=(
f"{resolved_project_dir} does not contain a {DBT_PROFILES_YML_NAME} file. Please"
" specify a valid path to a dbt profile directory."
),
)
return os.fspath(resolved_project_dir)
@root_validator(pre=True)
def validate_dbt_version(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the dbt version is supported."""
if version.parse(dbt_version) < version.parse("1.4.0"):
raise ValueError(
"To use `dagster_dbt.DbtCliResource`, you must use `dbt-core>=1.4.0`. Currently,"
f" you are using `dbt-core=={dbt_version}`. Please install a compatible dbt-core"
" version."
)
return values
def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> Path:
"""Get a unique target path for the dbt CLI invocation.
Args:
context (Optional[OpExecutionContext]): The execution context.
Returns:
str: A unique target path for the dbt CLI invocation.
"""
unique_id = str(uuid.uuid4())[:7]
path = unique_id
if context:
path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}"
current_target_path = _get_dbt_target_path()
return current_target_path.joinpath(path)
[docs] @public
def cli(
self,
args: List[str],
*,
raise_on_error: bool = True,
manifest: Optional[DbtManifestParam] = None,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
context: Optional[OpExecutionContext] = None,
target_path: Optional[Path] = None,
) -> DbtCliInvocation:
"""Create a subprocess to execute a dbt CLI command.
Args:
args (List[str]): The dbt CLI command to execute.
raise_on_error (bool): Whether to raise an exception if the dbt CLI command fails.
manifest (Optional[Union[Mapping[str, Any], str, Path]]): The dbt manifest blob. If an
execution context from within `@dbt_assets` is provided to the context argument,
then the manifest provided to `@dbt_assets` will be used.
dagster_dbt_translator (Optional[DagsterDbtTranslator]): The translator to link dbt
nodes to Dagster assets. If an execution context from within `@dbt_assets` is
provided to the context argument, then the dagster_dbt_translator provided to
`@dbt_assets` will be used.
context (Optional[OpExecutionContext]): The execution context from within `@dbt_assets`.
target_path (Optional[Path]): An explicit path to a target folder to use to store and
retrieve dbt artifacts when running a dbt CLI command. If not provided, a unique
target path will be generated.
Returns:
DbtCliInvocation: A invocation instance that can be used to retrieve the output of the
dbt CLI command.
Examples:
Streaming Dagster events for dbt asset materializations and observations:
.. code-block:: python
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
Retrieving a dbt artifact after streaming the Dagster events:
.. code-block:: python
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_run_invocation = dbt.cli(["run"], context=context)
yield from dbt_run_invocation.stream()
# Retrieve the `run_results.json` dbt artifact as a dictionary:
run_results_json = dbt_run_invocation.get_artifact("run_results.json")
# Retrieve the `run_results.json` dbt artifact as a file path:
run_results_path = dbt_run_invocation.target_path.joinpath("run_results.json")
Customizing the asset materialization metadata when streaming the Dagster events:
.. code-block:: python
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["run"], context=context)
for dbt_event in dbt_cli_invocation.stream_raw_events():
for dagster_event in dbt_event.to_default_asset_events(manifest=dbt_cli_invocation.manifest):
if isinstance(dagster_event, Output):
context.add_output_metadata(
metadata={
"my_custom_metadata": "my_custom_metadata_value",
},
output_name=dagster_event.output_name,
)
yield dagster_event
Suppressing exceptions from a dbt CLI command when a non-zero exit code is returned:
.. code-block:: python
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_run_invocation = dbt.cli(["run"], context=context, raise_on_error=False)
if dbt_run_invocation.is_successful():
yield from dbt_run_invocation.stream()
else:
...
Invoking a dbt CLI command in a custom asset or op:
.. code-block:: python
import json
from dagster import asset, op
from dagster_dbt import DbtCliResource
@asset
def my_dbt_asset(dbt: DbtCliResource):
dbt_macro_args = {"key": "value"}
dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait()
@op
def my_dbt_op(dbt: DbtCliResource):
dbt_macro_args = {"key": "value"}
dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait()
"""
target_path = target_path or self._get_unique_target_path(context=context)
env = {
**os.environ.copy(),
# Run dbt with unbuffered output.
"PYTHONUNBUFFERED": "1",
# Disable anonymous usage statistics for performance.
"DBT_SEND_ANONYMOUS_USAGE_STATS": "false",
# The DBT_LOG_FORMAT environment variable must be set to `json`. We use this
# environment variable to ensure that the dbt CLI outputs structured logs.
"DBT_LOG_FORMAT": "json",
# The DBT_TARGET_PATH environment variable is set to a unique value for each dbt
# invocation so that artifact paths are separated.
# See https://discourse.getdbt.com/t/multiple-run-results-json-and-manifest-json-files/7555
# for more information.
"DBT_TARGET_PATH": os.fspath(target_path),
# The DBT_LOG_PATH environment variable is set to the same value as DBT_TARGET_PATH
# so that logs for each dbt invocation has separate log files.
"DBT_LOG_PATH": os.fspath(target_path),
# The DBT_PROFILES_DIR environment variable is set to the path containing the dbt
# profiles.yml file.
# See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory
# for more information.
**({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}),
}
assets_def: Optional[AssetsDefinition] = None
with suppress(DagsterInvalidPropertyError):
assets_def = context.assets_def if context else None
selection_args: List[str] = []
dagster_dbt_translator = dagster_dbt_translator or DagsterDbtTranslator()
if context and assets_def is not None:
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(
[assets_def]
)
# When dbt is enabled with asset checks, we turn off any indirection with dbt selection.
# This way, the Dagster context completely determines what is executed in a dbt
# invocation with a subsetted selection.
if (
version.parse(dbt_version) >= version.parse("1.5.0")
and dagster_dbt_translator.settings.enable_asset_checks
):
env["DBT_INDIRECT_SELECTION"] = "empty"
selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
select=context.op.tags.get("dagster-dbt/select"),
exclude=context.op.tags.get("dagster-dbt/exclude"),
)
else:
manifest = validate_manifest(manifest) if manifest else {}
# TODO: verify that args does not have any selection flags if the context and manifest
# are passed to this function.
profile_args: List[str] = []
if self.profile:
profile_args = ["--profile", self.profile]
if self.target:
profile_args += ["--target", self.target]
args = ["dbt"] + self.global_config_flags + args + profile_args + selection_args
project_dir = Path(self.project_dir)
if not target_path.is_absolute():
target_path = project_dir.joinpath(target_path)
return DbtCliInvocation.run(
args=args,
env=env,
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
project_dir=project_dir,
target_path=target_path,
raise_on_error=raise_on_error,
)
def get_subset_selection_for_context(
context: OpExecutionContext,
manifest: Mapping[str, Any],
select: Optional[str],
exclude: Optional[str],
) -> List[str]:
"""Generate a dbt selection string to materialize the selected resources in a subsetted execution context.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work.
Args:
context (OpExecutionContext): The execution context for the current execution step.
select (Optional[str]): A dbt selection string to select resources to materialize.
exclude (Optional[str]): A dbt selection string to exclude resources from materializing.
Returns:
List[str]: dbt CLI arguments to materialize the selected resources in a
subsetted execution context.
If the current execution context is not performing a subsetted execution,
return CLI arguments composed of the inputed selection and exclusion arguments.
"""
default_dbt_selection = []
if select:
default_dbt_selection += ["--select", select]
if exclude:
default_dbt_selection += ["--exclude", exclude]
dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest)
dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)
# TODO: this should be a property on the context if this is a permanent indicator for
# determining whether the current execution context is performing a subsetted execution.
is_subsetted_execution = len(context.selected_output_names) != len(
context.assets_def.node_keys_by_output_name
)
if not is_subsetted_execution:
logger.info(
"A dbt subsetted execution is not being performed. Using the default dbt selection"
f" arguments `{default_dbt_selection}`."
)
return default_dbt_selection
selected_dbt_resources = []
for output_name in context.selected_output_names:
dbt_resource_props = dbt_resource_props_by_output_name[output_name]
# Explicitly select a dbt resource by its fully qualified name (FQN).
# https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method
fqn_selector = f"fqn:{'.'.join(dbt_resource_props['fqn'])}"
selected_dbt_resources.append(fqn_selector)
for _, check_name in context.selected_asset_check_keys:
test_resource_props = dbt_resource_props_by_test_name[check_name]
# Explicitly select a dbt resource by its fully qualified name (FQN).
# https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method
fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}"
selected_dbt_resources.append(fqn_selector)
# Take the union of all the selected resources.
# https://docs.getdbt.com/reference/node-selection/set-operators#unions
union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)]
logger.info(
"A dbt subsetted execution is being performed. Overriding default dbt selection"
f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`"
)
return union_selected_dbt_resources
def get_dbt_resource_props_by_output_name(
manifest: Mapping[str, Any]
) -> Mapping[str, Mapping[str, Any]]:
node_info_by_dbt_unique_id = get_dbt_resource_props_by_dbt_unique_id_from_manifest(manifest)
return {
output_name_fn(node): node
for node in node_info_by_dbt_unique_id.values()
if node["resource_type"] in ASSET_RESOURCE_TYPES
}
def get_dbt_resource_props_by_test_name(
manifest: Mapping[str, Any]
) -> Mapping[str, Mapping[str, Any]]:
return {
dbt_resource_props["name"]: dbt_resource_props
for unique_id, dbt_resource_props in manifest["nodes"].items()
if unique_id.startswith("test")
}