​Orchestrate ​Data ​Pipelines

Data teams use orchestration platforms like Kestra to manage complex pipelines — ingest raw data, transform it, and deliver it to data warehouses, lakes, and user-facing applications. The orchestration engine ensures workflows run in the correct sequence, recover from failures, and scale dynamically.

What is Data Orchestration?

Data orchestration automates the execution of interconnected tasks (ETL, Analytics, AI and ML jobs) while governing their dependencies and business logic. It focuses on how data moves between systems, teams and processes.

Kestra's data orchestration capabilities include:

  • Flexible workflow triggers — run data flows on schedule, external events (e.g., a new file in S3/SFTP), or API calls.
  • Powerful orchestration engine — control retries, parallel task runs, timeouts, SLAs, concurrency and error handling.
  • Dynamic resource allocation — provision containers on-demand (e.g., AWS Fargate, GCP Batch, Azure Batch, Kubernetes) for compute-heavy tasks.
  • Visibility — track logs, traces, metrics, inputs, outputs, and lineage across workflows and tasks.

Why Use Kestra for Data Orchestration?

  1. Simple Declarative Syntax – Define each data pipeline in a self-contained, portable YAML configuration that includes tasks, triggers, dependencies and infrastructure requirements.
  2. Extensible Integrations – Connect to over 600 services via pre-built plugins. Thanks to plugins, you can avoid writing custom code for boilerplate tasks like file downloads, SQL queries, or REST API calls.
  3. Execution Control – Set retries, timeouts, SLAs, and concurrency limits.
  4. Zero Code Changes – Run existing Python/R/SQL scripts as-is (no rewrites needed); specify dependencies via YAML configuration.
  5. State Management – Pass data of any size between tasks (files, variables, query results) or between workflows (using KV Store) thanks to internal storage.
  6. Dynamic Scaling – Scale custom code with task runners. Spin up containers on cloud services (AWS ECS Fargate, Google Batch, Kubernetes) dynamically at runtime — no need for dedicated always-on workers (to scale on-premise deployments, you can use worker groups).
  7. Observability – Monitor flow execution states, durations, logs, inputs, outputs and resource usage in real time.

Example: Data Engineering Pipeline

The following flow triggers a data sync from Airbyte, Fivetran, and dbt Cloud. Then, it downloads a JSON dataset via REST API, filters specific columns using Python, and calculates KPIs with DuckDB. Kestra dynamically provisions a Python container for the task running custom code and terminates it once the task completes:

yaml
id: data_pipeline
namespace: tutorial
description: Process product data to calculate brand price averages

inputs:
  - id: columns_to_keep
    type: ARRAY
    itemType: STRING
    defaults:
      - brand
      - price

tasks:
  - id: airbyte_sync
    type: io.kestra.plugin.airbyte.connections.Sync
    url: http://localhost:8080
    connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd
    wait: true

  - id: fivetran_sync
    type: io.kestra.plugin.fivetran.connectors.Sync
    apiKey: "{{ secret('FIVETRAN_API_KEY') }}"
    apiSecret: "{{ secret('FIVETRAN_API_SECRET') }}"
    connectorId: myConnectorId
    wait: true

  - id: dbt_job
    type: io.kestra.plugin.dbt.cloud.TriggerRun
    accountId: dbt_account
    token: "{{ secret('DBT_CLOUD_TOKEN') }}"
    jobId: abc12345
    wait: true

  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products

  # Filter columns in a disposable Python container
  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    containerImage: python:3.11-slim
    taskRunner: # spins up container on-demand
      type: io.kestra.plugin.scripts.runner.docker.Docker
    inputFiles:
      data.json: "{{ outputs.extract.uri }}"  # Input from previous task
    outputFiles:
      - "*.json"
    script: |
      import json
      filtered_data = [
          {col: product.get(col, "N/A") for col in {{ inputs.columns_to_keep }}}
          for product in json.load(open("data.json"))["products"]
      ]
      json.dump(filtered_data, open("products.json", "w"), indent=4)

  # Analyze filtered data with DuckDB
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    fetchType: STORE
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, ROUND(AVG(price), 2) AS avg_price
      FROM read_json_auto('{{ workingDir }}/products.json')
      GROUP BY brand
      ORDER BY avg_price DESC;

Getting Started with Data Orchestration

  1. Install Kestra – Follow the quick start guide or the full installation instructions for production environments.
  2. Write Your Workflows – Configure your flow in YAML, declaring inputs, tasks, and triggers. Tasks can be anything — scripts, queries, remote jobs or API calls. Add retry, timeout, concurrency or taskRunner settings to scale tasks dynamically and manage data orchestration logic.
  3. Add Triggers – Execute flows manually, via schedules, API, flow or event triggers (e.g., S3 file uploads).
  4. Observe and Manage – Use Kestra’s UI to inspect workflow outputs, logs, execution states, and dependencies.

Next Steps

Was this page helpful?