Source
yaml
id: data-engineering-pipeline
namespace: tutorial
labels:
name: Data Engineering Pipeline
inputs:
- id: columns_to_keep
type: ARRAY
itemType: STRING
defaults:
- brand
- price
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
description: Download the raw products JSON data.
uri: https://dummyjson.com/products
- id: transform
type: io.kestra.plugin.scripts.python.Script
description: Filter the product fields and write a cleaned JSON file.
containerImage: python:3.11-alpine
inputFiles:
data.json: "{{ outputs.extract.uri }}"
outputFiles:
- "*.json"
script: |
import json
import os
with open("data.json", "r") as file:
data = json.load(file)
filtered_data = [
{column: product.get(column, "N/A") for column in {{ inputs.columns_to_keep }}}
for product in data["products"]
]
with open("products.json", "w") as file:
json.dump(filtered_data, file, indent=4)
- id: query
type: io.kestra.plugin.jdbc.duckdb.Queries
description: Aggregate the cleaned data with DuckDB and store the result.
inputFiles:
products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
sql: |
INSTALL json;
LOAD json;
SELECT brand, round(avg(price), 2) as avg_price
FROM read_json_auto('{{ workingDir }}/products.json')
GROUP BY brand
ORDER BY avg_price DESC;
fetchType: STORE
description: |
# Flow Description
**Use case:** Compact data engineering pipeline that filters product data and aggregates it by brand.
**Highlights:**
- Download product JSON from a public API.
- Run a Python transform that keeps only the requested columns and emits a cleaned file.
- Query the cleaned data with DuckDB to compute average prices per brand and store the result for download in the UI.
About this blueprint
Getting Started Python SQL API
This flow is a simple example of a Kestra flow used for a data engineering use case. It downloads a JSON file, filters the data, and calculates the average price per brand.
The flow has three tasks:
- The first task downloads a JSON file.
- The second task filters the data and writes it to a new JSON file.
- The third task reads the filtered data, calculates the average price per brand using DuckDB, and stores the result as a Kestra output which can be previewed and downloaded from the UI.