This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources.
This package currently includes a Sling integration which provides a simple way to sync data between databases and file systems.
Related documentation pages: embedded-elt.
Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.
A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.
replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.
dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.
(Optional[str] (name) – The name of the op.
partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.
backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.
op_tags (Optional[Mapping[str, Any]]) – The tags for this asset.
Examples
Running a sync by providing a path to a Sling Replication config:
from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=config_path,
dagster_sling_translator=DagsterSlingTranslator(),
):
context.log.info(lines)
A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.
The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.
For example:
stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}
By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster AssetKey.
Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
asset_key: "mydb_users"
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition
The Dagster AssetKey for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.
By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
deps: "sourcedb_users"
stream_name (str) – The name of the stream.
The Dagster AssetKey dependency for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_deps_asset_key(cls, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.
By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.
stream_name (str) – The name of the stream.
Examples
Using a custom stream name sanitizer:
class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def sanitize_stream_name(cls, stream_name: str) -> str:
return stream_name.replace(".", "")
( experimental ) > This API may break in future versions, even between dot releases.
Resource for interacting with the Sling package. This resource can be used to run Sling replications.
connections (List[SlingConnectionResource]) – A list of connections to use for the replication.
source_connection (Optional[SlingSourceConnection]) – Deprecated, use connections instead.
target_connection (Optional[SlingTargetConnection]) – Deprecated, use connections instead.
Examples
from dagster_etl.sling import SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
Runs a Sling replication from the given replication config.
replication_config – The Sling replication config to use for the replication.
dagster_sling_translator – The translator to use for the replication.
debug – Whether to run the replication in debug mode.
A generator of MaterializeResult
Optional[Generator[MaterializeResult, None, None]]
A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.
Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections
The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.
Examples
Creating a Sling Connection for a file, such as CSV or JSON:
source = SlingConnectionResource(name="MY_FILE", type="file")
Create a Sling Connection for a Postgres database, using a connection string:
postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")
Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:
( deprecated ) > This API will be removed in version 0.23.0.
Use @sling_assets instead..>
Asset Factory for using Sling to sync data from a source stream to a target object.
asset_spec (AssetSpec) – The AssetSpec to use to materialize this asset.
source_stream (str) – The source stream to sync from. This can be a table, a query, or a path.
target_object (str) – The target object to sync to. This can be a table, or a path.
mode (SlingMode, optional) – The sync mode to use when syncing. Defaults to SlingMode.FULL_REFRESH.
primary_key (Optional[Union[str, List[str]]], optional) – The optional primary key to use when syncing.
update_key (Optional[str], optional) – The optional update key to use when syncing.
source_options (Optional[Dict[str, Any]], optional) – Any optional Sling source options to use when syncing.
target_options (Optional[Dict[str, Any]], optional) – Any optional target options to use when syncing.
sling_resource_key (str, optional) – The resource key for the SlingResource. Defaults to “sling”.
Examples
Creating a Sling asset that syncs from a file to a table:
asset_spec = AssetSpec(key=["main", "dest_tbl"])
asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="file:///tmp/test.csv",
target_object="main.dest_table",
mode=SlingMode.INCREMENTAL,
primary_key="id"
)
Creating a Sling asset that syncs from a table to a file with a full refresh:
asset_spec = AssetSpec(key="test.csv")
asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="main.dest_table",
target_object="file:///tmp/test.csv",
mode=SlingMode.FULL_REFRESH
)
( deprecated ) > This API will be removed in version 0.23.0.
SlingSourceConnection has been deprecated, use SlingConnectionResource for both source and target connections..>
A Sling Source Connection defines the source connection used by SlingResource
.
Examples
Creating a Sling Source for a file, such as CSV or JSON:
source = SlingSourceConnection(type="file")
Create a Sling Source for a Postgres database, using a connection string:
source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
source = SlingSourceConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema")
Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres
source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD"))
( deprecated ) > This API will be removed in version 0.23.0.
SlingTargetConnection has been deprecated, use SlingConnectionResource for both source and target connections..>
A Sling Target Connection defines the target connection used by SlingResource
.
Examples
Creating a Sling Target for a file, such as CSV or JSON:
source = SlingTargetConnection(type="file")
Create a Sling Source for a Postgres database, using a connection string:
source = SlingTargetConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema"
source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres