​Orchestrate ​Python ​Workflows

Data teams and developers use Python for AI, ML, ETL, analytics, and a lot more. Kestra lets you schedule and orchestrate Python scripts at scale — whether they’re simple data transformations, API calls, or compute-heavy ML jobs — without rewriting code or managing infrastructure.

What is Workflow Orchestration for Python?

Workflow orchestration platforms like Kestra automate the execution and deployment of your Python code across environments, handling dependencies, error recovery, and scaling resource. With Kestra, you can:

  • Schedule scripts via cron, external events (e.g., new files in S3), or API calls.
  • Manage dependencies with pip and uv, using custom or pre-built container images.
  • Pass data between Python tasks and downstream steps (SQL queries, APIs, etc.).
  • Scale dynamically — run scripts in lightweight containers or cloud services like AWS ECS Fargate, Azure Batch, GCP Cloud Run, Modal or Kubernetes.

Why Use Kestra for Python Scripts?

  1. Zero Code Changes – Run existing Python scripts as-is (no decorators needed); specify dependencies via YAML configuration or no-code forms.
  2. Dependency Management – Dynamically install latest packages at runtime with pip, use custom Docker images, or leverage pre-built packages.
  3. Dynamic ScalingTask runners provision resources on-demand (AWS ECS Fargate, Google Batch) for heavy workloads.
  4. Observability – Track logs, outputs, and custom metrics (e.g., row counts, durations) in real time.
  5. Integration – Combine Python with SQL, Spark, dbt, or microservices in a single flow.
  6. Failure Handling – Retry failed scripts with configurable retry policies and get alerts on errors.
  7. React to Events – Trigger Python scripts on file uploads from S3/SFTP, API calls, or custom events from Kafka, RabbitMQ, SQS, etc.
  8. Schedules and Backfills – Run scripts on a schedule or backfill historical data with custom parameters.

Example: Python Data Pipeline

This flow runs a Python script to fetch data, processes it with Pandas, and logs results. Kestra dynamically provisions a container for the task and scales down once complete:

yaml
id: sales_analysis
namespace: analytics
description: Analyze daily sales data

tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv

  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    containerImage: ghcr.io/kestra-io/pydata:latest  # Pre-built image with Pandas
    inputFiles:
      data.csv: "{{ outputs.extract.uri }}"
    script: |
      import pandas as pd
      from kestra import Kestra

      df = pd.read_csv("data.csv")

      total_sales = float(df["total"].sum())
      product_quantity = df.groupby("product_id")["quantity"].sum().astype('int32')
      top_product_id = int(product_quantity.idxmax())

      Kestra.outputs({
          "total_sales": round(total_sales, 2),
          "top_product_id": top_product_id,
          "total_quantity_sold": int(product_quantity.max())
      })

      Kestra.counter("row_count", int(len(df)))
      Kestra.counter("unique_products", int(df['product_id'].nunique()))

  - id: notify
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "https://reqres.in/api/slack"
    payload: |
      {
        "text": "📊 *Daily Sales Report*
        • Total Sales: ${{ outputs.transform.vars.total_sales }}
        • Top Product ID: #{{ outputs.transform.vars.top_product_id }}
        • Units Sold of Top Product: {{ outputs.transform.vars.total_quantity_sold }}"
      }

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 * * *" # Run every day at 9 AM

Adding the following pluginDefaults to that flow (or your namespace) will scale the Python task to run on AWS ECS Fargate:

yaml
pluginDefaults:
  - type: io.kestra.plugin.scripts.python
    values:
       taskRunner:
         type: io.kestra.plugin.ee.aws.runner.Batch
         region: us-east-1
         accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
         secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
         computeEnvironmentArn: "arn:aws:batch:us-east-1:123456789:compute-environment/kestra"
         jobQueueArn: "arn:aws:batch:us-east-1:123456789:job-queue/kestra"
         executionRoleArn: "arn:aws:iam::123456789:role/ecsTaskExecutionRole"
         taskRoleArn: "arn:aws:iam::123456789:role/ecsTaskRole"
         bucket: kestra-us

You can set plugin defaults at the flow, namespace, or global level to apply to all tasks of that type, ensuring that all Python tasks run on AWS ECS Fargate in a given environment.


Kestra Features for Python Orchestration

Package Dependency Management

Install packages at runtime or use pre-built images:

yaml
- id: script
  type: io.kestra.plugin.scripts.python.Script
  beforeCommands:
    - pip install pandas requests
  script: |
    # Your code here

Outputs and Metrics

Pass data between tasks using outputs and track metrics:

python
from kestra import Kestra
Kestra.outputs({"key": "value"})  # Pass to downstream tasks
Kestra.counter("rows_processed", 1000)  # Track metrics

Dynamic Scaling

Run heavy scripts on dynamically provisioned cloud infrastructure:

yaml
taskRunner:
  type: io.kestra.plugin.ee.aws.runner.Batch
  resources:
    cpu: 4
    memory: 8192

Error Handling

Add configurable retry policies to automatically retry failed tasks:

yaml
retry:
  type: constant
  interval: PT1M
  maxAttempts: 3

Alert on failures via email, Slack, and other notification plugins:

yaml
errors:
  - id: send_alert
    type: io.kestra.plugin.notifications.slack.SlackExecution
    url: "{{secret('SLACK_WEBHOOK_URL')}}"
    executionId: "{{execution.id}}"

Getting Started Orchestrating Python Workflows

  1. Install Kestra – Follow the quick start guide or production setup.
  2. Write Your Flow – Define Python tasks in YAML. Use Script for inline code or Commands for .py files:
    yaml
    - id: py
      type: io.kestra.plugin.scripts.python.Commands
      namespaceFiles:
        enabled: true
      commands:
        - python scripts/transform.py
    
  3. Add Triggers – run flows on schedule, via API or on events (e.g., new files in S3).
  4. Observe – Monitor execution logs, outputs, and metrics in Kestra’s UI.

Next Steps

Was this page helpful?