Blueprints

Getting started with Kestra — a Data Engineering Pipeline example

Source

yaml
id: data-engineering-pipeline
namespace: tutorial
labels:
  name: Data Engineering Pipeline

inputs:
  - id: columns_to_keep
    type: ARRAY
    itemType: STRING
    defaults:
      - brand
      - price

tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    description: Download the raw products JSON data.
    uri: https://dummyjson.com/products

  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    description: Filter the product fields and write a cleaned JSON file.
    containerImage: python:3.11-alpine
    inputFiles:
      data.json: "{{ outputs.extract.uri }}"
    outputFiles:
      - "*.json"
    script: |
      import json
      import os

      with open("data.json", "r") as file:
          data = json.load(file)

      filtered_data = [
          {column: product.get(column, "N/A") for column in {{ inputs.columns_to_keep }}}
          for product in data["products"]
      ]

      with open("products.json", "w") as file:
          json.dump(filtered_data, file, indent=4)

  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Queries
    description: Aggregate the cleaned data with DuckDB and store the result.
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_json_auto('{{ workingDir }}/products.json')
      GROUP BY brand
      ORDER BY avg_price DESC;
    fetchType: STORE

description: |
  # Flow Description
  **Use case:** Compact data engineering pipeline that filters product data and aggregates it by brand.
  **Highlights:**
  - Download product JSON from a public API.
  - Run a Python transform that keeps only the requested columns and emits a cleaned file.
  - Query the cleaned data with DuckDB to compute average prices per brand and store the result for download in the UI.

About this blueprint

Getting Started Python SQL API

This flow is a simple example of a Kestra flow used for a data engineering use case. It downloads a JSON file, filters the data, and calculates the average price per brand.

The flow has three tasks:

  1. The first task downloads a JSON file.
  2. The second task filters the data and writes it to a new JSON file.
  3. The third task reads the filtered data, calculates the average price per brand using DuckDB, and stores the result as a Kestra output which can be previewed and downloaded from the UI.

Download

Script

Queries

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra