Abstractions for the orchestration side of the Dagster Pipes protocol.
( experimental ) This API may break in future versions, even between dot releases. >
Pipes client base class.
Pipes clients for specific external environments should subclass this.
clients must have context and extras arguments, but also can add arbitrary arguments that are appropriate for their own implementation.
context (OpExecutionContext) – The context from the executing op/asset.
extras (Optional[PipesExtras]) – Arbitrary data to pass to the external environment.
Wrapper containing results reported by the external process.
PipesClientCompletedInvocation
( experimental ) This API may break in future versions, even between dot releases. >
( experimental ) This API may break in future versions, even between dot releases. >
( experimental ) This API may break in future versions, even between dot releases. >
Class to process PipesMessage
objects received from a pipes process.
context (OpExecutionContext) – The context for the executing op/asset.
( experimental ) This API may break in future versions, even between dot releases. >
Object representing a pipes session.
A pipes session is defined by a pair of PipesContextInjector
and
PipesMessageReader
objects. At the opening of the session, the context injector
writes context data to an externally accessible location, and the message reader starts
monitoring an externally accessible location. These locations are encoded in parameters stored
on a PipesSession object.
During the session, an external process should be started and the parameters injected into its
environment. The typical way to do this is to call PipesSession.get_bootstrap_env_vars()
and pass the result as environment variables.
During execution, results (e.g. asset materializations) are reported by the external process and buffered on the PipesSession object. The buffer can periodically be cleared and yielded to Dagster machinery by calling yield from PipesSession.get_results().
When the external process exits, the session can be closed. Closing consists of handling any unprocessed messages written by the external process and cleaning up any resources used for context injection and message reading.
context_data (PipesContextData) – The context for the executing op/asset.
message_handler (PipesMessageHandler) – The message handler to use for processing messages
context_injector_params (PipesParams) – Parameters yielded by the context injector, indicating the location from which the external process should load context data.
message_reader_params (PipesParams) – Parameters yielded by the message reader, indicating the location to which the external process should write messages.
Encode context injector and message reader params as environment variables.
Passing environment variables is the typical way to expose the pipes I/O parameters to a pipes process.
Environment variables to pass to the external process. The values are serialized as json, compressed with gzip, and then base-64-encoded.
Mapping[str, str]
Get the params necessary to bootstrap a launched pipes process. These parameters are typically are as environment variable. See get_bootstrap_env_vars. It is the context injector’s responsibility to decide how to pass these parameters to the external environment.
Parameters to pass to the external process and their corresponding values that must be passed by the context injector.
Mapping[str, str]
( experimental ) This API may break in future versions, even between dot releases. >
Message reader that reads a sequence of message chunks written by an external process into a blob store such as S3, Azure blob storage, or GCS.
The reader maintains a counter, starting at 1, that is synchronized with a message writer in
some pipes process. The reader starts a thread that periodically attempts to read a chunk
indexed by the counter at some location expected to be written by the pipes process. The chunk
should be a file with each line corresponding to a JSON-encoded pipes message. When a chunk is
successfully read, the messages are processed and the counter is incremented. The
PipesBlobStoreMessageWriter
on the other end is expected to similarly increment a
counter (starting from 1) on successful write, keeping counters on the read and write end in
sync.
If stdout_reader or stderr_reader are passed, this reader will also start them when read_messages is called. If they are not passed, then the reader performs no stdout/stderr forwarding.
interval (float) – interval in seconds between attempts to download a chunk
stdout_reader (Optional[PipesBlobStoreStdioReader]) – A reader for reading stdout logs.
stderr_reader (Optional[PipesBlobStoreStdioReader]) – A reader for reading stderr logs.
( experimental ) This API may break in future versions, even between dot releases. >
Context injector that injects context data into the external process by injecting it directly into the external process environment.
( experimental ) This API may break in future versions, even between dot releases. >
Context injector that injects context data into the external process by writing it to a specified file.
path (str) – The path of a file to which to write context data. The file will be deleted on close of the pipes session.
( experimental ) This API may break in future versions, even between dot releases. >
Message reader that reads messages by tailing a specified file.
path (str) – The path of the file to which messages will be written. The file will be deleted on close of the pipes session.
( experimental ) This API may break in future versions, even between dot releases. >
Context injector that injects context data into the external process by writing it to an automatically-generated temporary file.
( experimental ) This API may break in future versions, even between dot releases. >
Message reader that reads messages by tailing an automatically-generated temporary file.
( experimental ) This API may break in future versions, even between dot releases. >
Context manager that opens and closes a pipes session.
This context manager should be used to wrap the launch of an external process using the pipe
protocol to report results back to Dagster. The yielded PipesSession
should be used
to (a) obtain the environment variables that need to be provided to the external process; (b)
access results streamed back from the external process.
This method is an alternative to PipesClient
subclasses for users who want more
control over how pipes processes are launched. When using open_pipes_session, it is the user’s
responsibility to inject the message reader and context injector parameters available on the
yielded PipesSession and pass them to the appropriate API when launching the external process.
Typically these parameters should be set as environment variables.
context (OpExecutionContext) – The context for the current op/asset execution.
context_injector (PipesContextInjector) – The context injector to use to inject context into the external process.
message_reader (PipesMessageReader) – The message reader to use to read messages from the external process.
extras (Optional[PipesExtras]) – Optional extras to pass to the external process via the injected context.
PipesSession – Interface for interacting with the external process.
import subprocess
from dagster import open_pipes_session
extras = {"foo": "bar"}
@asset
def ext_asset(context: OpExecutionContext):
with open_pipes_session(
context=context,
extras={"foo": "bar"},
context_injector=ExtTempFileContextInjector(),
message_reader=ExtTempFileMessageReader(),
) as pipes_session:
subprocess.Popen(
["/bin/python", "/path/to/script.py"],
env={**pipes_session.get_bootstrap_env_vars()}
)
while process.poll() is None:
yield from pipes_session.get_results()
yield from pipes_session.get_results()