You are viewing an unreleased or outdated version of the documentation

External Assets (Experimental)#

An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. This allows you to model assets in Dagster, attach metadata and events to those assets, but without scheduling their materialization with Dagster.

External assets are a good fit when data is:

  • Landed by an external source (e.g. an external file landing daily; Kafka landing data into Amazon S3)
  • Created and processed using manual processes
  • Materialized by existing pipelines with their own scheduling and infrastructure that you do not want to or need to migrate en masse

With an external asset, you can:

  • Attach metadata to its definition for documentation, tracking ownership, and so on
  • Track its data quality and version in Dagster
  • Use asset sensors or auto-materialize policies to update downstream assets based on updates to external assets

You cannot, however:

  • Schedule an external asset's materialization
  • Backfill an external asset using Dagster
  • Use the Dagster UI or GraphQL API to instigate ad hoc materializations

What about Source Assets? A common use case for external assets is modeling data produced by a process not under Dagster's control. For example, a daily file drop from a third party into Amazon S3. In most systems, these are described as sources. This includes Dagster, which includes SourceAsset. As external assets are a superset of Source Asset functionality, source assets will be supplanted by external assets in the near future.


Relevant APIs#

NameDescription
external_assets_from_specsCreate list of AssetsDefinition objects that represent external assets
AssetSpecAn object that represents the metadata of a particular asset

Defining external assets#

The following code declares a single external asset that represents a file in S3 and passes it to a Definitions object:

Click the Asset in the Dagster UI tab to see how this asset would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, external_asset_from_spec

defs = Definitions(assets=[external_asset_from_spec(AssetSpec("file_in_s3"))])

External assets with dependencies#

External assets can depend only on other external assets.

Dependencies are defined by using the deps argument of AssetSpec. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.

In the following example, we have two assets: raw_logs and processed_logs. The processed_logs asset is produced by a scheduled computation in another orchestration system. Using external assets allows you to model both assets in Dagster.

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])

defs = Definitions(assets=external_assets_from_specs([raw_logs, processed_logs]))

Fully-managed assets with external asset dependencies#

Fully-managed assets can depend on external assets. In this example, the aggregated_logs asset depends on processed_logs, which is an external asset:

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, asset, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])


@asset(deps=[processed_logs])
def aggregated_logs() -> None:
    # Loads "processed_log" into memory and performs some aggregation
    ...


defs = Definitions(
    assets=[aggregated_logs, *external_assets_from_specs([raw_logs, processed_logs])]
)

Updating external asset metadata#

As Dagster doesn't control scheduling or materializing external assets, it's up to you to keep their metadata updated. This also means that materialization for external assets will be disabled in the Dagster UI.

To keep your external assets updated, you can use any of the following approaches:

Using the REST API#

Dagster OSS exposes a REST endpoint for reporting asset materializations. Refer to the following tabs for examples using a curl command, and for invoking the API in Python.

The following demonstrates how to use a curl command in a shell script to communicate with the API:

curl --request POST \
    --url https://path/to/instance/report_asset_materialization/{asset_key}\
    --header 'Content-Type: application/json' \
    --data '{
    "metadata" : {
        "source": "From curl command"
    }
}'

The API also has endpoints for reporting asset observations and asset check evaluations.

Using sensors#

By using the asset_events parameter of SensorResult, you can generate events to attach to external assets and then provide them directly to sensors. For example:

import datetime

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    SensorEvaluationContext,
    SensorResult,
    external_asset_from_spec,
    sensor,
)


def utc_now_str() -> str:
    return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
    # Materialization happened in external system, but is recorded here
    return SensorResult(
        asset_events=[
            AssetMaterialization(
                asset_key="external_asset_a",
                metadata={
                    "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
                },
            )
        ]
    )


defs = Definitions(
    assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
    sensors=[keep_external_asset_a_up_to_date],
)

Using the Python API#

You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is report_runless_asset_event on DagsterInstance.

For example, this would be useful when writing a hand-rolled Python script to backfill metadata:

from dagster import AssetMaterialization

# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
    AssetMaterialization(
        "asset_one", metadata={"nrows": 10, "source": "From this script."}
    )
)

Logging events in unrelated ops#

You can log an AssetMaterialization from a bare op. In this case, use the log_event method of OpExecutionContext to report an asset materialization of an external asset. For example:

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    OpExecutionContext,
    external_asset_from_spec,
    job,
    op,
)


@op
def an_op(context: OpExecutionContext) -> None:
    context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
    an_op()


defs = Definitions(
    assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
)