You are viewing an unreleased or outdated version of the documentation

embedded-elt (dagster-embedded-elt)

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources.

This package currently includes a Sling integration which provides a simple way to sync data between databases and file systems.

Related documentation pages: embedded-elt.

Sling

Assets

dagster_embedded_elt.sling.build_sling_asset(asset_spec, source_stream, target_object, mode=SlingMode.FULL_REFRESH, primary_key=None, update_key=None, source_options=None, target_options=None, sling_resource_key='sling')[source]

experimental This API may break in future versions, even between dot releases.

Asset Factory for using Sling to sync data from a source stream to a target object.

Parameters:
  • asset_spec (AssetSpec) – The AssetSpec to use to materialize this asset.

  • source_stream (str) – The source stream to sync from. This can be a table, a query, or a path.

  • target_object (str) – The target object to sync to. This can be a table, or a path.

  • mode (SlingMode, optional) – The sync mode to use when syncing. Defaults to SlingMode.FULL_REFRESH.

  • primary_key (Optional[Union[str, List[str]]], optional) – The optional primary key to use when syncing.

  • update_key (Optional[Union[str, List[str]]], optional) – The optional update key to use when syncing.

  • source_options (Optional[Dict[str, Any]], optional) – Any optional Sling source options to use when syncing.

  • target_options (Optional[Dict[str, Any]], optional) – Any optional target options to use when syncing.

  • sling_resource_key (str, optional) – The resource key for the SlingResource. Defaults to “sling”.

Examples

Creating a Sling asset that syncs from a file to a table:

asset_spec = AssetSpec(key=["main", "dest_tbl"])
asset_def = build_sling_asset(
        asset_spec=asset_spec,
        source_stream="file:///tmp/test.csv",
        target_object="main.dest_table",
        mode=SlingMode.INCREMENTAL,
        primary_key="id"
)

Creating a Sling asset that syncs from a table to a file with a full refresh:

asset_spec = AssetSpec(key="test.csv")
asset_def = build_sling_asset(
        asset_spec=asset_spec,
        source_stream="main.dest_table",
        target_object="file:///tmp/test.csv",
        mode=SlingMode.FULL_REFRESH
)

Resources

class dagster_embedded_elt.sling.SlingResource(*, source_connection, target_connection)[source]

experimental This API may break in future versions, even between dot releases.

Resource for interacting with the Sling package.

Examples

from dagster_etl.sling import SlingResource
sling_resource = SlingResource(
    source_connection=SlingSourceConnection(
        type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")
    ),
    target_connection=SlingTargetConnection(
        type="snowflake",
        host="host",
        user="user",
        database="database",
        password="password",
        role="role",
    ),
)
class dagster_embedded_elt.sling.resources.SlingSourceConnection(*, type, connection_string=None, **config_dict)[source]

A Sling Source Connection defines the source connection used by SlingResource.

Examples

Creating a Sling Source for a file, such as CSV or JSON:

source = SlingSourceConnection(type="file")

Create a Sling Source for a Postgres database, using a connection string:

source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
source = SlingSourceConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema")

Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres

source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD"))
class dagster_embedded_elt.sling.resources.SlingTargetConnection(*, type, connection_string=None, **config_dict)[source]

A Sling Target Connection defines the target connection used by SlingResource.

Examples

Creating a Sling Target for a file, such as CSV or JSON:

source = SlingTargetConnection(type="file")

Create a Sling Source for a Postgres database, using a connection string:

source = SlingTargetConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema"
source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))

Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres