You are viewing an unreleased or outdated version of the documentation

Source code for dagster_databricks.resources

from typing import Any, Optional

from dagster import (
    Config,
    ConfigurableResource,
    IAttachDifferentObjectToOpContext,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field, root_validator

from .databricks import DatabricksClient


class OauthCredentials(Config):
    """OAuth credentials for Databricks.

    See https://docs.databricks.com/dev-tools/api/latest/authentication.html#oauth-2-0.
    """

    client_id: str = Field(description="OAuth client ID")
    client_secret: str = Field(description="OAuth client secret")


[docs]class DatabricksClientResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """Resource which provides a Python client for interacting with Databricks within an op or asset. """ host: str = Field(description="Databricks host, e.g. https://uksouth.azuredatabricks.com") token: Optional[str] = Field(default=None, description="Databricks access token") oauth_credentials: Optional[OauthCredentials] = Field( default=None, description=( "Databricks OAuth credentials for using a service principal. See" " https://docs.databricks.com/en/dev-tools/auth.html#oauth-2-0" ), ) workspace_id: Optional[str] = Field( default=None, description=( "DEPRECATED: The Databricks workspace ID, as described in" " https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids." " This is no longer used and will be removed in a 0.21." ), ) @root_validator() def has_token_or_oauth_credentials(cls, values): token = values.get("token") oauth_credentials = values.get("oauth_credentials") if not token and not oauth_credentials: raise ValueError("Must provide either token or oauth_credentials") if token and oauth_credentials: raise ValueError("Must provide either token or oauth_credentials, not both") return values @classmethod def _is_dagster_maintained(cls) -> bool: return True def get_client(self) -> DatabricksClient: if self.oauth_credentials: client_id = self.oauth_credentials.client_id client_secret = self.oauth_credentials.client_secret else: client_id = None client_secret = None return DatabricksClient( host=self.host, token=self.token, oauth_client_id=client_id, oauth_client_secret=client_secret, workspace_id=self.workspace_id, ) def get_object_to_set_on_execution_context(self) -> Any: return self.get_client()
[docs]@dagster_maintained_resource @resource(config_schema=DatabricksClientResource.to_config_schema()) def databricks_client(init_context) -> DatabricksClient: return DatabricksClientResource.from_resource_context(init_context).get_client()