Snowflake data pipeline with Kestra
Snowflake is one of the most popular cloud data warehouse technologies. This post demonstrates Kestra plugins for Snowflake data management, including event-driven triggers based on changes in your Snowflake data, file management, and queries.
With Snowflake, companies can build scalable data workloads that can perform strong data analysis on structured, unstructured, and semi-structured data to derive valuable business insights and make data-driven decisions. Additionally, Snowflake Data Marketplace allows customers to access numerous ready-to-query datasets, further reducing integration costs.
Snowflake can automatically manage its multi-cluster warehouses, dynamically scaling up and down to balance resource usage and costs.
The platform enables organizations to avoid large-scale licensing costs commonly associated with data warehousing tools, operating on a pay-as-you-go basis for storage and compute.
Data warehouse workloads are typically part of a larger technological stack. To streamline operations, orchestration, and scheduling of data pipelines are crucial. This is where Kestra comes into play.
Kestra is designed to orchestrate and schedule scalable data workflows, thereby enhancing DataOps teams' productivity. It can construct, operate, manage, and monitor a variety of complex workflows sequentially or in parallel.
Kestra can execute workflows based on event-based, time-based, and API-based scheduling, giving complete control. Snowflake already offers many cost optimization processes like data compression and auto-scaling. However, Kestra makes it simpler to download, upload, and query data by integrating with Snowflake's storage and compute resources.
Besides the Snowflake plugin, Kestra offers numerous JDBC plugin integrations, including ClickHouse, DuckDb, MySQL, Oracle, Apache Pinot, PostgreSQL, Redshift, Rockset, SQL Server, Trino, Vectorwise, and Vertica. These plugins can effectively process and transform tabular data within relational databases, reducing the processing cost of platforms like Snowflake.
Kestra's Snowflake plugin provides an efficient solution for creating intricate data pipelines. You can perform the download, upload, and query tasks. Let's dive into the key functionalities provided by the plugin.
Kestra can query the Snowflake server using this task to insert, update, and delete data. The Query task offers numerous properties, including auto-committing SQL statements, different fetching operations, specifying access-control roles, and storing fetch results. When the
true, Kestra allows storage of large results as an output of the Query task.
The plugin allows the usage of multi-SQL statements in the same transaction as a full SQL script with isolation support. It allows simple queries and fetches results with
fetchOne properties, enabling teams to reuse the output on the next tasks from tools like Kafka Consume, Elastic Search, Mongo Find, and more. Some Query task instances are:
- Fetch a row from the database, and define multiple flows depending on the output
- Fetch the count of a store and iterate through the list. If an item doesn't exist, perform a particular task.
The following code snippet executes a query to fetch results from one table into Kestra internal storage.
id: select type: io.kestra.plugin.jdbc.snowflake.Query url: jdbc:snowflake://<account_identifier>.snowflakecomputing.com username: snowflake password: snowflake_passwd sql: select * from source fetch: true
This task downloads data from the Snowflake server to an internal Kestra stage which is based on Amazon ION. The Download task provides the URL of the downloaded file available on the Kestra storage server. The Download task offers properties such as data compression and access control role to streamline the download process of the connected database.
The following code snippet downloads the default database to the specified
fileName location on the internal Kestra server.
id: "download" type: "io.kestra.plugin.jdbc.snowflake.Download" stageName: MYSTAGE fileName: prefix/destFile.csv
This task uploads data to an internal Snowflake stage. Similar to Download task, Upload can perform data compression and set access control role. Snowflake also support data transformation while loading data, which simplifies the ETL process.
The following code snippet uploads data to the specified
id: "upload" type: "io.kestra.plugin.jdbc.snowflake.Upload" stageName: MYSTAGE prefix: testUploadStream fileName: destFile.csv
With its rich set of features, Kestra's Snowflake plugin offers you the ability to build highly customizable and robust data pipelines:
- Querying a Snowflake Table with Conditions In data analysis, you often need to fetch specific rows based on certain conditions. The Query task provides a where property to filter the data accordingly. Here's how you can use it:
id: conditional_select type: io.kestra.plugin.jdbc.snowflake.Query url: jdbc:snowflake://<account_identifier>.snowflakecomputing.com username: snowflake password: snowflake_passwd sql: SELECT * FROM source WHERE id > 1000 fetch: true
- Listening to Changes in a Snowflake Table with Trigger Real-time monitoring of changes in your Snowflake table is a vital aspect of data pipeline management. The Snowflake Trigger plugin is designed precisely for this task. Here's an example of how to use it:
id: trigger_on_update type: io.kestra.plugin.jdbc.snowflake.trigger url: jdbc:snowflake://<account_identifier>.snowflakecomputing.com username: snowflake password: snowflake_passwd sql: SELECT MAX(updated_at) FROM source interval: PT1M
- Downloading Specific Columns from a Snowflake Stage While managing data, you might not need all the information from your Snowflake stage. The Download task allows you to fetch only the specific columns you need. This is achievable by modifying the sql property in the task:
id: "selective_download" type: "io.kestra.plugin.jdbc.snowflake.Download" stageName: MYSTAGE sql: SELECT column1, column2 FROM source fileName: destFile.csv
- Uploading Data to a Snowflake Stage with Transformations Uploading data often involves a transformation process. The Upload task allows you to upload and transform your data in a single step, as shown below:
id: "transformed_upload" type: "io.kestra.plugin.jdbc.snowflake.Upload" stageName: MYSTAGE prefix: transformedData fileName: destFile.csv fileFormat: (type = 'CSV', field_delimiter = ',', skip_header = 1, NULL_IF = ('\\N'))
Kestra provides flexibility and control to data teams, it can orchestrate any kind of workflow with ease using the rich UI that monitors all flows.
Kestra's Snowflake plugin makes data warehousing simple even for non-developers thanks to YAML. Your Snowflake storage pipeline can accommodates raw data from multiple sources and transforms it using ETL operations. Additionally, you can skip the transformation and directly load data into the warehouse using the ELT pipeline. Kestra can manage both workflows simultaneously. In any case, Kestra ensures that the data is readily available to perform analysis and learn valuable patterns.