Design and Automate ETL Pipelines Using DuckDB and Kestra

For the complete documentation index, see llms.txt. For a full content snapshot, see llms-full.txt. Append .md to any kestra.io/docs/* URL for plain Markdown.

Build ETL pipelines in Kestra using DuckDB, Python and Task Runners.

This tutorial demonstrates building different ETL pipelines in Kestra.

Using DuckDB

DuckDB transforms data directly using SQL queries.

In the example below, we fetch CSV files, perform the join transformation using DuckDB Query task, store the result, upload the detailed orders onto S3, perform another transformation on the stored result, and finally upload the file as CSV onto S3.

id: etl_using_duckdb
namespace: company.team
tasks:
- id: download_orders_csv
type: io.kestra.plugin.core.http.Download
description: Download orders.csv file
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: download_products_csv
type: io.kestra.plugin.core.http.Download
description: Download products.csv file
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv
- id: get_detailed_orders
type: io.kestra.plugin.jdbc.duckdb.Query
description: Perform JOIN transformation using DuckDB
inputFiles:
orders.csv: "{{ outputs.download_orders_csv.uri }}"
products.csv: "{{ outputs.download_products_csv.uri }}"
sql: |
SELECT
o.order_id,
o.customer_name,
o.customer_email,
o.product_id,
o.price,
o.quantity,
o.total,
p.product_name,
p.product_category,
p.brand
FROM read_csv_auto('{{ workingDir }}/orders.csv', header=True) o
JOIN read_csv_auto('{{ workingDir }}/products.csv', header=True) p
ON o.product_id = p.product_id
ORDER BY order_id ASC;
store: true
- id: ion_to_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
description: Convert the result into CSV
from: "{{ outputs.get_detailed_orders.uri }}"
- id: upload_detailed_orders_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.get_orders_per_product_csv.uri }}"
bucket: "my_bucket"
key: "orders/detailed_orders"
- id: get_orders_per_product
type: io.kestra.plugin.jdbc.duckdb.Query
description: Perform aggregation using DuckDB
inputFiles:
detailed_orders.csv: "{{ outputs.ion_to_csv.uri }}"
sql: |
SELECT product_id,
COUNT(product_id) as order_count,
SUM(quantity) as product_count,
CAST(SUM(total) AS DECIMAL(10,2)) AS order_total
FROM read_csv_auto('{{ workingDir }}/detailed_orders.csv', header=True)
GROUP BY product_id
ORDER BY product_id ASC
store: true
- id: get_orders_per_product_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
description: Convert the result into CSV
from: "{{ outputs.get_orders_per_product.uri }}"
- id: upload_orders_per_product_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.get_orders_per_product_csv.uri }}"
bucket: "my_bucket"
key: "orders/orders_per_product"

Similar Query tasks can be performed on different databases like Snowflake, Postgres, etc.

Using Python

You can choose to perform ETL using python (pandas) and then run it as a Python script.

The ETL performed using DuckDB above can be performed using Python as shown in the example flow below.

id: python_etl
namespace: company.team
tasks:
- id: etl
type: io.kestra.plugin.scripts.python.Script
description: Python ETL Script
beforeCommands:
- pip install requests pandas
script: |
import io
import requests
import pandas as pd
def _extract(url):
csv_data = requests.get(url).content
return pd.read_csv(io.StringIO(csv_data.decode('utf-8')), header=0)
def run_etl():
orders_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv")
products_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv")
# perform join transformation
detailed_orders = orders_data.merge(products_data, how='left', left_on='product_id', right_on='product_id')
detailed_orders.to_csv("detailed_orders.csv")
# perform aggregation
orders_per_product = detailed_orders.groupby('product_id').agg(order_count= ('product_id', 'count'), product_count=('quantity', 'sum'), order_total=('total', 'sum')).sort_values('product_id')
orders_per_product['order_total'] = orders_per_product['order_total'].apply(lambda x: float("{:.2f}".format(x)))
orders_per_product.to_csv("orders_per_product.csv")
if __name__ == "__main__":
run_etl()
outputFiles:
- detailed_orders.csv
- orders_per_product.csv
- id: upload_detailed_orders_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.python_etl.outputFiles('detailed_orders.csv') }}"
bucket: "my_bucket"
key: "orders/detailed_orders"
- id: upload_orders_per_product_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.python_etl.outputFiles('orders_per_product.csv') }}"
bucket: "my_bucket"
key: "orders/orders_per_product"

Using Batch Task Runners

When the python scripts get more compute-intesive or memory-intensive, it is advised to run them on remote batch compute resources using Batch Task Runners.

Kestra provides a variety of Batch Task Runners. Here is an example of how the ETL python script can be run on a AWS Batch Task Runner.

id: aws_batch_task_runner_etl
namespace: company.team
tasks:
- id: python_etl_on_aws_task_runner
type: io.kestra.plugin.scripts.python.Script
description: Run python ETL script on Docker Task Runner
containerImage: python:3.11-slim
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
region: eu-central-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
computeEnvironmentArn: "arn:aws:batch:eu-central-1:01234567890:compute-environment/kestraFargateEnvironment"
jobQueueArn: "arn:aws:batch:eu-central-1:01234567890:job-queue/kestraJobQueue"
executionRoleArn: "arn:aws:iam::01234567890:role/kestraEcsTaskExecutionRole"
taskRoleArn: arn:aws:iam::01234567890:role/ecsTaskRole
bucket: kestra-product-de
beforeCommands:
- pip install requests pandas
script: |
import io
import requests
import pandas as pd
def _extract(url):
csv_data = requests.get(url).content
return pd.read_csv(io.StringIO(csv_data.decode('utf-8')), header=0)
def run_etl():
orders_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv")
products_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv")
# perform join transformation
detailed_orders = orders_data.merge(products_data, how='left', left_on='product_id', right_on='product_id')
detailed_orders.to_csv("detailed_orders.csv")
# perform aggregation
orders_per_product = detailed_orders.groupby('product_id').agg(order_count= ('product_id', 'count'), product_count=('quantity', 'sum'), order_total=('total', 'sum')).sort_values('product_id')
orders_per_product['order_total'] = orders_per_product['order_total'].apply(lambda x: float("{:.2f}".format(x)))
orders_per_product.to_csv("orders_per_product.csv")
if __name__ == "__main__":
run_etl()
outputFiles:
- detailed_orders.csv
- orders_per_product.csv
- id: upload_detailed_orders_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.python_etl.outputFiles('detailed_orders.csv') }}"
bucket: "my_bucket"
key: "orders/detailed_orders"
- id: upload_orders_per_product_to_s3
type: io.kestra.plugin.aws.s3.Upload
description: Upload the resulting CSV file onto S3
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
from: "{{ outputs.python_etl.outputFiles('orders_per_product.csv') }}"
bucket: "my_bucket"
key: "orders/orders_per_product"

Using dbt

You can create a similar pipeline based on an ELT model using dbt via Kestra, using namespace files for the dbt models.

This example uses dbt + BigQuery to perform the ELT process: it loads data from an HTTP request to Hugging Face into BigQuery tables, performs join and aggregate transformations using dbt, and then queries the resulting tables.

id: dbt_transformations
namespace: kestra.engineering.bigquery.dbt
tasks:
- id: orders_http_download
type: io.kestra.plugin.core.http.Download
description: Download orders.csv using HTTP Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: products_http_download
type: io.kestra.plugin.core.http.Download
description: Download products.csv using HTTP Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv
- id: create_orders_table
type: io.kestra.plugin.gcp.bigquery.CreateTable
description: Create orders table in BigQuery
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp_project_id>
dataset: ecommerce
table: orders
tableDefinition:
type: TABLE
schema:
fields:
- name: order_id
type: INT64
- name: customer_name
type: STRING
- name: customer_email
type: STRING
- name: product_id
type: INT64
- name: price
type: FLOAT64
- name: quantity
type: INT64
- name: total
type: FLOAT64
- id: create_products_table
type: io.kestra.plugin.gcp.bigquery.CreateTable
description: Create products table in BigQuery.
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp_project_id>
dataset: ecommerce
table: products
tableDefinition:
type: TABLE
schema:
fields:
- name: product_id
type: INT64
- name: product_name
type: STRING
- name: product_category
type: STRING
- name: brand
type: STRING
- id: load_orders_table
type: io.kestra.plugin.gcp.bigquery.Load
description: Load orders table with data from orders.csv
from: "{{ outputs.orders_http_download.uri }}"
projectId: <gcp_project_id>
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
destinationTable: "<gcp_project_id>.ecommerce.orders"
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
- id: load_products_table
type: io.kestra.plugin.gcp.bigquery.Load
description: Load products table with data from products.csv
from: "{{ outputs.products_http_download.uri }}"
projectId: <gcp_project_id>
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
destinationTable: "<gcp_project_id>.ecommerce.products"
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
- id: dbt
type: io.kestra.plugin.dbt.cli.DbtCLI
description: Use dbt build to perform the dbt transformations
inputFiles:
sa.json: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/dbt-bigquery:latest
namespaceFiles:
enabled : true
profiles: |
bq_dbt_project:
outputs:
dev:
type: bigquery
method: service-account
dataset: ecommerce
project: <gcp_project_id>
keyfile: sa.json
location: US
priority: interactive
threads: 16
timeout_seconds: 300
fixed_retries: 1
target: dev
commands:
- dbt deps
- dbt build
- id: query_detailed_orders
type: io.kestra.plugin.gcp.bigquery.Query
description: Query the newly generated detailed_orders BigQuery table
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp_project_id>
sql: |
SELECT * FROM <gcp_project_id>.ecommerce.detailed_orders
store: true
- id: query_orders_per_product
type: io.kestra.plugin.gcp.bigquery.Query
description: Query the newly generated orders_per_product BigQuery table
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp_project_id>
sql: |
SELECT * FROM <gcp_project_id>.ecommerce.orders_per_product
store: true

Here are the files that you should create in the Kestra editor.

Firstly, create dbt_project.yml file, and put the following contents into it.

name: 'bq_dbt_project'
version: '1.0.0'
config-version: 2
profile: 'bq_dbt_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
bq_dbt_project:
example:
+materialized: view

Next, create models folder. All the upcoming files will be created under the models folder.

Create sources.yml, which defines the source tables referenced in other models.

version: 2
sources:
- name: ecommerce
database: <gcp_project_id>
schema: ecommerce
tables:
- name: orders
- name: products

Next, create two files — stg_orders.sql and stg_products.sql — which materialize as views on top of the source tables:

stg_orders.sql

{{ config(materialized="view") }}
select order_id,
customer_name,
customer_email,
product_id,
price,
quantity,
total
from {{ source('ecommerce', 'orders') }}

stg_products.sql

{{ config(materialized="view") }}
select
product_id,
product_name,
product_category,
brand
from {{ source('ecommerce', 'products') }}

Next, create detailed_orders.sql, which creates the detailed_orders table by joining the stg_orders and stg_products views on product_id:

{{ config(materialized="table") }}
select
o.order_id,
o.customer_name,
o.customer_email,
o.product_id,
p.product_name,
p.product_category,
p.brand,
o.price,
o.quantity,
o.total
from {{ ref('stg_orders') }} o join {{ ref('stg_products') }} p
on o.product_id = p.product_id

Next, create order_per_product.sql, which creates the order_per_product table by aggregating the detailed_orders table:

{{ config(materialized="table") }}
select
product_id,
COUNT(product_id) as order_count,
SUM(quantity) as product_count,
SUM(total) AS order_total
from {{ ref('detailed_orders') }}
group by product_id
order by product_id asc

With this, we are ready with all the dbt models. You can now execute the flow. The flow will generate the detailed_orders and orders_per_product tables. You can view the content of this table by going to the Outputs of the last two tasks.

Using Spark

We can perform the same ETL process using Spark.

The flow for performing the same transformation using Spark will look as follows:

id: spark_python_submit
namespace: kestra.engineering.spark
tasks:
- id: python_submit
type: io.kestra.plugin.spark.PythonSubmit
runner: DOCKER
docker:
networkMode: host
user: root
master: spark://localhost:7077
args:
- "10"
mainScript: |
from pyspark.sql import SparkSession
from pyspark import SparkFiles
orders_url = "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv"
products_url = "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv"
spark.sparkContext.addFile(orders_url)
spark.sparkContext.addFile(products_url)
if __name__ == "__main__":
spark = SparkSession.builder.appName("EcommerceApp").getOrCreate()
#Create orders dataframe based on orders.csv
orders_df = spark.read.csv("file://" + SparkFiles.get("orders.csv"), inferSchema=True, header=True)
#Create products dataframe based on orders.csv
products_df = spark.read.csv("file://" + SparkFiles.get("products.csv"), inferSchema=True, header=True)
#Create detailed_orders by joining orders_df and products_df
detailed_orders_df = orders_df.join(products_df, orders_df.product_id == products_df.product_id, "left")
# Print the contents of detailed_orders_df
detailed_orders_df.show(10)
spark.stop()

Was this page helpful?