Data teams and developers use Python for AI, ML, ETL, analytics, and a lot more. Kestra lets you schedule and orchestrate Python scripts at scale — whether they’re simple data transformations, API calls, or compute-heavy ML jobs — without rewriting code or managing infrastructure.
What is Workflow Orchestration for Python?
Workflow orchestration platforms like Kestra automate the execution and deployment of your Python code across environments, handling dependencies, error recovery, and scaling resource. With Kestra, you can:
- Schedule scripts via cron, external events (e.g., new files in S3), or API calls.
- Manage dependencies with
pip
anduv
, using custom or pre-built container images. - Pass data between Python tasks and downstream steps (SQL queries, APIs, etc.).
- Scale dynamically — run scripts in lightweight containers or cloud services like AWS ECS Fargate, Azure Batch, GCP Cloud Run, Modal or Kubernetes.
Why Use Kestra for Python Scripts?
- Zero Code Changes – Run existing Python scripts as-is (no decorators needed); specify dependencies via YAML configuration or no-code forms.
- Dependency Management – Dynamically install latest packages at runtime with
pip
, use custom Docker images, or leverage pre-built packages. - Dynamic Scaling – Task runners provision resources on-demand (AWS ECS Fargate, Google Batch) for heavy workloads.
- Observability – Track logs, outputs, and custom metrics (e.g., row counts, durations) in real time.
- Integration – Combine Python with SQL, Spark, dbt, or microservices in a single flow.
- Failure Handling – Retry failed scripts with configurable retry policies and get alerts on errors.
- React to Events – Trigger Python scripts on file uploads from S3/SFTP, API calls, or custom events from Kafka, RabbitMQ, SQS, etc.
- Schedules and Backfills – Run scripts on a schedule or backfill historical data with custom parameters.
Example: Python Data Pipeline
This flow runs a Python script to fetch data, processes it with Pandas, and logs results. Kestra dynamically provisions a container for the task and scales down once complete:
id: sales_analysis
namespace: analytics
description: Analyze daily sales data
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: transform
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest # Pre-built image with Pandas
inputFiles:
data.csv: "{{ outputs.extract.uri }}"
script: |
import pandas as pd
from kestra import Kestra
df = pd.read_csv("data.csv")
total_sales = float(df["total"].sum())
product_quantity = df.groupby("product_id")["quantity"].sum().astype('int32')
top_product_id = int(product_quantity.idxmax())
Kestra.outputs({
"total_sales": round(total_sales, 2),
"top_product_id": top_product_id,
"total_quantity_sold": int(product_quantity.max())
})
Kestra.counter("row_count", int(len(df)))
Kestra.counter("unique_products", int(df['product_id'].nunique()))
- id: notify
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "https://reqres.in/api/slack"
payload: |
{
"text": "📊 *Daily Sales Report*
• Total Sales: ${{ outputs.transform.vars.total_sales }}
• Top Product ID: #{{ outputs.transform.vars.top_product_id }}
• Units Sold of Top Product: {{ outputs.transform.vars.total_quantity_sold }}"
}
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 * * *" # Run every day at 9 AM
Adding the following pluginDefaults
to that flow (or your namespace) will scale the Python task to run on AWS ECS Fargate:
pluginDefaults:
- type: io.kestra.plugin.scripts.python
values:
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
region: us-east-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
computeEnvironmentArn: "arn:aws:batch:us-east-1:123456789:compute-environment/kestra"
jobQueueArn: "arn:aws:batch:us-east-1:123456789:job-queue/kestra"
executionRoleArn: "arn:aws:iam::123456789:role/ecsTaskExecutionRole"
taskRoleArn: "arn:aws:iam::123456789:role/ecsTaskRole"
bucket: kestra-us
You can set plugin defaults at the flow, namespace, or global level to apply to all tasks of that type, ensuring that all Python tasks run on AWS ECS Fargate in a given environment.
Kestra Features for Python Orchestration
Package Dependency Management
Install packages at runtime or use pre-built images:
- id: script
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install pandas requests
script: |
# Your code here
Outputs and Metrics
Pass data between tasks using outputs and track metrics:
from kestra import Kestra
Kestra.outputs({"key": "value"}) # Pass to downstream tasks
Kestra.counter("rows_processed", 1000) # Track metrics
Dynamic Scaling
Run heavy scripts on dynamically provisioned cloud infrastructure:
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
resources:
cpu: 4
memory: 8192
Error Handling
Add configurable retry
policies to automatically retry failed tasks:
retry:
type: constant
interval: PT1M
maxAttempts: 3
Alert on failures via email, Slack, and other notification plugins:
errors:
- id: send_alert
type: io.kestra.plugin.notifications.slack.SlackExecution
url: "{{secret('SLACK_WEBHOOK_URL')}}"
executionId: "{{execution.id}}"
Getting Started Orchestrating Python Workflows
- Install Kestra – Follow the quick start guide or production setup.
- Write Your Flow – Define Python tasks in YAML. Use
Script
for inline code orCommands
for.py
files:yaml- id: py type: io.kestra.plugin.scripts.python.Commands namespaceFiles: enabled: true commands: - python scripts/transform.py
- Add Triggers – run flows on schedule, via API or on events (e.g., new files in S3).
- Observe – Monitor execution logs, outputs, and metrics in Kestra’s UI.
Next Steps
- Explore Python plugins
- Manage package dependencies with Docker or
pip
. - Explore video tutorials on our YouTube channel.
- Join Slack to ask questions, contribute code or share feature requests.
- Book a demo to discuss how Kestra can help orchestrate your Python workflows.
Was this page helpful?