Source
yaml
id: data-engineering-pipeline
namespace: tutorial
description: Data Engineering Pipelines
inputs:
- id: columns_to_keep
type: ARRAY
itemType: STRING
defaults:
- brand
- price
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://dummyjson.com/products
- id: transform
type: io.kestra.plugin.scripts.python.Script
containerImage: python:3.11-alpine
inputFiles:
data.json: "{{ outputs.extract.uri }}"
outputFiles:
- "*.json"
env:
COLUMNS_TO_KEEP: "{{ inputs.columns_to_keep }}"
script: |
import json
import os
columns_to_keep_str = os.getenv("COLUMNS_TO_KEEP")
columns_to_keep = json.loads(columns_to_keep_str)
with open("data.json", "r") as file:
data = json.load(file)
filtered_data = [
{column: product.get(column, "N/A") for column in columns_to_keep}
for product in data["products"]
]
with open("products.json", "w") as file:
json.dump(filtered_data, file, indent=4)
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
sql: |
INSTALL json;
LOAD json;
SELECT brand, round(avg(price), 2) as avg_price
FROM read_json_auto('{{ workingDir }}/products.json')
GROUP BY brand
ORDER BY avg_price DESC;
fetchType: STORE
About this blueprint
Getting Started Docker Python Data API DuckDB
This flow is a simple example of a Kestra flow used for a data engineering use case. It downloads a JSON file, filters the data, and calculates the average price per brand. The flow has three tasks: 1. The first task downloads a JSON file. 2. The second task filters the data and writes it to a new JSON file. 3. The third task reads the filtered data, calculates the average price per brand using DuckDB, and stores the result as a Kestra output which can be previewed and downloaded from the UI.