Ask AI

You are viewing an unreleased or outdated version of the documentation

Dagster Embedded ELT#

This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback.

This package includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.

We plan on adding additional embedded ELT tool integrations in the future.


Overview#

To get started with dagster-embedded-elt and Sling, first, familiarize yourself with Sling's replication configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The dagtser-embedded-elt integration uses this configuration to build the assets for both sources and destinations.

The typical pattern for building an ELT pipeline with Sling has three steps:

  1. First, define a replication.yaml file that specifies the source and target connections, as well as which streams to sync from.

  2. Next, create a SlingResource and pass a list of SlingConnectionResource for each connection to the connection parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration.

  3. Use the @sling_assets decorator to define an asset that will run the Sling replication job and yield from the SlingResource.replicate method to run the sync.

Each step is explained in detail below:


Step 1: Set up a Sling replication configuration#

Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing replication.yaml file, or construct a dictionary that represents the configuration in Python.

This configuration is passed to the Sling CLI to run the replication job.

Here's an example of a replication.yaml file:

SOURCE: MY_POSTGRES
TARGET: MY_SNOWFLAKE

defaults:
  mode: full-refresh
  object: "{stream_schema}_{stream_table}"

streams:
  public.accounts:
  public.users:
  public.finance_departments:
    object: "departments"

Or in Python:

replication_config = {
    "SOURCE": "MY_POSTGRES",
    "TARGET": "MY_DUCKDB",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "public.accounts": None,
        "public.users": None,
        "public.finance_departments": {"object": "departments"},
    },
}

Step 2: Create a Sling resource#

Next, you will need to create a SlingResource object that contains references to the connections specified in the replication configuration.

A SlingResource takes a connections parameter, where each SlingConnectionResource represents a connection to a source or target database. You may provide as many connections to the SlingResource as needed.

The name parameter in the SlingConnectionResource should match the SOURCE and TARGET keys in the replication configuration.

You may pass a connection string or arbitrary keyword arguments to the SlingConnectionResource to specify the connection details. See the Sling connections reference for the specific connection types and parameters.

from dagster_embedded_elt.sling.resources import (
    SlingConnectionResource,
    SlingResource,
)

from dagster import EnvVar

sling_resource = SlingResource(
    connections=[
        # Using a connection string from an environment variable
        SlingConnectionResource(
            name="MY_POSTGRES",
            type="postgres",
            connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
        ),
        # Using a hard-coded connection string
        SlingConnectionResource(
            name="MY_DUCKDB",
            type="duckdb",
            connection_string="duckdb:///var/tmp/duckdb.db",
        ),
        # Using a keyword-argument constructor
        SlingConnectionResource(
            name="MY_SNOWFLAKE",
            type="snowflake",
            host=EnvVar("SNOWFLAKE_HOST"),
            user=EnvVar("SNOWFLAKE_USER"),
            role="REPORTING",
        ),
    ]
)

Step 3: Define the Sling assets#

Now you can define a Sling asset using the @sling_assets decorator. Dagster will read the replication configuration to produce assets.

Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom DagsterSlingTranslator object.

from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
    DagsterSlingTranslator,
    SlingResource,
    sling_assets,
)

from dagster import Definitions, file_relative_path

replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...])  # Add connections here


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(
        replication_config=replication_config,
        dagster_sling_translator=DagsterSlingTranslator(),
    )
    for row in sling.stream_raw_logs():
        context.log.info(row)


defs = Definitions(
    assets=[
        my_assets,
    ],
    resources={
        "sling": sling_resource,
    },
)

That's it! You should now be able to view your assets in the Dagster UI and run the replication job.


Examples#

Example 1: Database to Database#

This is an example of how to setup a Sling sync between two databases such as Postgres and Snowflake:

from dagster_embedded_elt.sling import (
    DagsterSlingTranslator,
    SlingConnectionResource,
    SlingResource,
    sling_assets,
)

from dagster import EnvVar

source = SlingConnectionResource(
    name="MY_PG",
    type="postgres",
    host="localhost",
    port=5432,
    database="my_database",
    user="my_user",
    password=EnvVar("PG_PASS"),
)

target = SlingConnectionResource(
    name="MY_SF",
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=EnvVar("SF_PASSWORD"),
    role="role",
)

sling = SlingResource(
    connections=[
        source,
        target,
    ]
)
replication_config = {
    "SOURCE": "MY_PG",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "public.accounts": None,
        "public.users": None,
        "public.finance_departments": {"object": "departments"},
    },
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(
        replication_config=replication_config,
        dagster_sling_translator=DagsterSlingTranslator(),
    )

Example 2: File to Database#

This is an example of how to setup a Sling sync between a file in an object store and a database such as S3 and Snowflake:

source = SlingConnectionResource(
    name="MY_S3",
    type="s3",
    bucket="sling-bucket",
    access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
    secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)

sling = SlingResource(connections=[source, target])

replication_config = {
    "SOURCE": "MY_S3",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "s3://my-bucket/my_file.parquet": {
            "object": "marts.my_table",
            "primary_key": "id",
        },
    },
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(
        replication_config=replication_config,
        dagster_sling_translator=DagsterSlingTranslator(),
    )

Relevant APIs#

NameDescription
@sling_assetsThe core Sling asset factory for building syncs
SlingResourceThe Sling resource used for handing credentials to databases and object stores
DagsterSlingTranslatorA translator for specifying how to map between Sling and Dagster types
SlingConnectionResourceA Sling connection resource for specifying database and storage connection credentials