Blueprints

Getting started with Kestra — a Data Engineering Pipeline example

About this blueprint

API Docker DuckDB Getting Started Python

This flow is a simple example of a Kestra flow used for a data engineering use case. It downloads a JSON file, filters the data, and calculates the average price per brand.

The flow has three tasks:

  1. The first task downloads a JSON file.
  2. The second task filters the data and writes it to a new JSON file.
  3. The third task reads the filtered data, calculates the average price per brand using DuckDB, and stores the result as a Kestra output which can be previewed and downloaded from the UI.
yaml
id: data_engineering_pipeline
namespace: tutorial
description: Data Engineering Pipelines

inputs:
  - id: columns_to_keep
    type: ARRAY
    itemType: STRING
    defaults: 
      - brand
      - price

tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products

  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    containerImage: python:3.11-alpine
    inputFiles:
      data.json: "{{ outputs.extract.uri }}"
    outputFiles:
      - "*.json"
    env:
      COLUMNS_TO_KEEP: "{{ inputs.columns_to_keep }}"
    script: |
      import json
      import os

      columns_to_keep_str = os.getenv("COLUMNS_TO_KEEP")
      columns_to_keep = json.loads(columns_to_keep_str)

      with open("data.json", "r") as file:
          data = json.load(file)

      filtered_data = [
          {column: product.get(column, "N/A") for column in columns_to_keep}
          for product in data["products"]
      ]

      with open("products.json", "w") as file:
          json.dump(filtered_data, file, indent=4)

  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_json_auto('{{ workingDir }}/products.json')
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

Download

Script

Query

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra