You are viewing an unreleased or outdated version of the documentation

Source code for dagster_embedded_elt.sling.resources

import contextlib
import json
import re
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
from typing import Any, Dict, Generator, List, Optional

from dagster import ConfigurableResource, PermissiveConfig, get_dagster_logger
from dagster._annotations import experimental
from dagster._utils.env import environ
from pydantic import Field
from sling import Sling

logger = get_dagster_logger()


class SlingMode(str, Enum):
    """The mode to use when syncing.

    See the Sling docs for more information: https://docs.slingdata.io/sling-cli/running-tasks#modes.
    """

    INCREMENTAL = "incremental"
    TRUNCATE = "truncate"
    FULL_REFRESH = "full-refresh"
    SNAPSHOT = "snapshot"


[docs]class SlingSourceConnection(PermissiveConfig): """A Sling Source Connection defines the source connection used by :py:class:`~dagster_elt.sling.SlingResource`. Examples: Creating a Sling Source for a file, such as CSV or JSON: .. code-block:: python source = SlingSourceConnection(type="file") Create a Sling Source for a Postgres database, using a connection string: .. code-block:: python source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) source = SlingSourceConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema") Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres .. code-block:: python source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD")) """ type: str = Field(description="Type of the source connection. Use 'file' for local storage.") connection_string: Optional[str] = Field( description="The connection string for the source database." )
[docs]class SlingTargetConnection(PermissiveConfig): """A Sling Target Connection defines the target connection used by :py:class:`~dagster_elt.sling.SlingResource`. Examples: Creating a Sling Target for a file, such as CSV or JSON: .. code-block:: python source = SlingTargetConnection(type="file") Create a Sling Source for a Postgres database, using a connection string: .. code-block:: python source = SlingTargetConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema" source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres .. code-block::python source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD")) """ type: str = Field( description="Type of the destination connection. Use 'file' for local storage." ) connection_string: Optional[str] = Field( description="The connection string for the target database." )
[docs]@experimental class SlingResource(ConfigurableResource): """Resource for interacting with the Sling package. Examples: .. code-block:: python from dagster_etl.sling import SlingResource sling_resource = SlingResource( source_connection=SlingSourceConnection( type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING") ), target_connection=SlingTargetConnection( type="snowflake", host="host", user="user", database="database", password="password", role="role", ), ) """ source_connection: SlingSourceConnection target_connection: SlingTargetConnection @contextlib.contextmanager def _setup_config(self) -> Generator[None, None, None]: """Uses environment variables to set the Sling source and target connections.""" sling_source = self.source_connection.dict() sling_target = self.target_connection.dict() if self.source_connection.connection_string: sling_source["url"] = self.source_connection.connection_string if self.target_connection.connection_string: sling_target["url"] = self.target_connection.connection_string with environ( { "SLING_SOURCE": json.dumps(sling_source), "SLING_TARGET": json.dumps(sling_target), } ): yield @staticmethod def _exec_sling_cmd(cmd, stdin=None, stdout=PIPE, stderr=STDOUT) -> Generator[str, None, None]: ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") with Popen(cmd, shell=True, stdin=stdin, stdout=stdout, stderr=stderr) as proc: assert proc.stdout for line in proc.stdout: fmt_line = str(line, "utf-8") clean_line = ansi_escape.sub("", fmt_line).replace("INF", "") yield clean_line proc.wait() if proc.returncode != 0: raise Exception("Sling command failed with error code %s", proc.returncode) def _sync( self, source_stream: str, target_object: str, mode: SlingMode = SlingMode.FULL_REFRESH, primary_key: Optional[List[str]] = None, update_key: Optional[List[str]] = None, source_options: Optional[Dict[str, Any]] = None, target_options: Optional[Dict[str, Any]] = None, ) -> Generator[str, None, None]: """Runs a Sling sync from the given source table to the given destination table. Generates output lines from the Sling CLI. """ if self.source_connection.type == "file" and not source_stream.startswith("file://"): source_stream = "file://" + source_stream if self.target_connection.type == "file" and not target_object.startswith("file://"): target_object = "file://" + target_object with self._setup_config(): config = { "source": { "conn": "SLING_SOURCE", "stream": source_stream, "primary_key": primary_key, "update_key": update_key, "options": source_options, }, "target": { "conn": "SLING_TARGET", "object": target_object, "options": target_options, }, } config["source"] = {k: v for k, v in config["source"].items() if v is not None} config["target"] = {k: v for k, v in config["target"].items() if v is not None} sling_cli = Sling(**config) logger.info("Starting Sling sync with mode: %s", mode) cmd = sling_cli._prep_cmd() # noqa: SLF001 yield from self._exec_sling_cmd(cmd) def sync( self, source_stream: str, target_object: str, mode: SlingMode, primary_key: Optional[List[str]] = None, update_key: Optional[List[str]] = None, source_options: Optional[Dict[str, Any]] = None, target_options: Optional[Dict[str, Any]] = None, ) -> Generator[str, None, None]: """Initiate a Sling Sync between a source stream and a target object. Args: source_stream (str): The source stream to read from. For database sources, the source stream can be either a table name, a SQL statement or a path to a SQL file e.g. `TABLE1` or `SCHEMA1.TABLE2` or `SELECT * FROM TABLE`. For file sources, the source stream is a path or an url to a file. For file targets, the target object is a path or a url to a file, e.g. file:///tmp/file.csv or s3://my_bucket/my_folder/file.csv target_object (str): The target object to write into. For database targets, the target object is a table name, e.g. TABLE1, SCHEMA1.TABLE2. For file targets, the target object is a path or an url to a file. mode (SlingMode): The Sling mode to use when syncing, i.e. incremental, full-refresh See the Sling docs for more information: https://docs.slingdata.io/sling-cli/running-tasks#modes. primary_key (str): For incremental syncs, a primary key is used during merge statements to update existing rows. update_key (str): For incremental syncs, an update key is used to stream records after max(update_key) source_options (Dict[str, Any]): Other source options to pass to Sling, see https://docs.slingdata.io/sling-cli/running-tasks#source-options-src-options-flag-source.options-key for details target_options (Dict[str, Any[): Other target options to pass to Sling, see https://docs.slingdata.io/sling-cli/running-tasks#target-options-tgt-options-flag-target.options-key for details Examples: Sync from a source file to a sqlite database: .. code-block:: python sqllite_path = "/path/to/sqlite.db" csv_path = "/path/to/file.csv" @asset def run_sync(context, sling: SlingResource): res = sling.sync( source_stream=csv_path, target_object="events", mode=SlingMode.FULL_REFRESH, ) for stdout in res: context.log.debug(stdout) counts = sqlite3.connect(sqllitepath).execute("SELECT count(1) FROM events").fetchone() assert counts[0] == 3 source = SlingSourceConnection( type="file", ) target = SlingTargetConnection(type="sqlite", instance=sqllitepath) materialize( [run_sync], resources={ "sling": SlingResource( source_connection=source, target_connection=target, mode=SlingMode.TRUNCATE, ) }, ) """ yield from self._sync( source_stream=source_stream, target_object=target_object, mode=mode, primary_key=primary_key, update_key=update_key, source_options=source_options, target_options=target_options, )