To gather the following information, which is required to use the Snowflake I/O manager:
Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:
Snowflake credentials: This includes a username and a password. The Snowflake I/O manager can read these values from environment variables. In this guide, we store the username and password as SNOWFLAKE_USER and SNOWFLAKE_PASSWORD, respectively.
The Snowflake I/O manager requires some configuration to connect to your Snowflake instance. The account, user and password are required to connect with Snowflake. Additionally, you need to specify a database to where all the tables should be stored.
You can also provide some optional configuration to further customize the Snowflake I/O manager. You can specify a warehouse and schema where data should be stored, and a role for the I/O manager.
from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={"io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1",# required"user":{"env":"SNOWFLAKE_USER"},# required"password":{"env":"SNOWFLAKE_PASSWORD"},# required"database":"FLOWERS",# required"role":"writer",# optional, defaults to the default role for the account"warehouse":"PLANTS",# optional, defaults to default warehouse for the account"schema":"IRIS,",# optional, defaults to PUBLIC})},)
With this configuration, if you materialized an asset called iris_dataset, the Snowflake I/O manager would be permissioned with the role writer and would store the data in the FLOWERS.IRIS.IRIS_DATASET table in the PLANTS warehouse.
Finally, in the Definitions object, we assign the snowflake_pandas_io_manager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.
For more info about each of the configuration values, refer to the build_snowflake_io_manager API documentation.
The Snowflake I/O manager can create and update tables for your Dagster defined assets, but you can also make existing Snowflake tables available to Dagster.
To store data in Snowflake using the Snowflake I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the Snowflake I/O Manager, like in Step 1: Configure the Snowflake I/O manager, and Dagster will handle storing and loading your assets in Snowflake.
import pandas as pd
from dagster import asset
@assetdefiris_dataset()-> pd.DataFrame:return pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=["Sepal length (cm)","Sepal width (cm)","Petal length (cm)","Petal width (cm)","Species",],)
In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.
When Dagster materializes the iris_dataset asset using the configuration from Step 1: Configure the Snowflake I/O manager, the Snowflake I/O manager will create the table FLOWERS.IRIS.IRIS_DATASET if it does not exist and replace the contents of the table with the value returned from the iris_dataset asset.
You may already have tables in Snowflake that you want to make available to other Dagster assets. You can create source assets for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets.
from dagster import SourceAsset
iris_harvest_data = SourceAsset(key="iris_harvest_data")
In this example, we create a SourceAsset for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, we need to tell the Snowflake I/O manager how to find the data.
Since we supply the database and the schema in the I/O manager configuration in Step 1: Configure the Snowflake I/O manager, we only need to provide the table name. We do this with the key parameter in SourceAsset. When the I/O manager needs to load the iris_harvest_data in a downstream asset, it will select the data in the FLOWERS.IRIS.IRIS_HARVEST_DATA table as a Pandas DataFrame and provide it to the downstream asset.
Step 3: Load Snowflake tables in downstream assets#
Once you have created an asset or source asset that represents a table in Snowflake, you will likely want to create additional assets that work with the data. Dagster and the Snowflake I/O manager allow you to load the data stored in Snowflake tables into downstream assets
import pandas as pd
from dagster import asset
# this example uses the iris_dataset asset from Step 2@assetdefiris_cleaned(iris_dataset: pd.DataFrame):return iris_dataset.dropna().drop_duplicates()
In iris_cleaned, the iris_dataset parameter tells Dagster that the value for the iris_dataset asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.
When materializing these assets, Dagster will use the snowflake_pandas_io_manager to fetch the FLOWERS.IRIS.IRIS_DATASET as a Pandas DataFrame and pass this DataFrame as the iris_dataset parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the snowflake_pandas_io_manager to store the DataFrame as the FLOWERS.IRIS.IRIS_CLEANED table in Snowflake.