Kestra Python SDK
Release: 1.0.0
Interact with Kestra's API via Python SDK.
Install the Python SDK
This guide demonstrates how to use the Kestra Python SDK to create and execute flows programmatically.
Before starting, make sure your Kestra instance is running and accessible via the KESTRA_HOST environment variable.
You can store credentials in an .env file:
KESTRA_HOST=http://localhost:8080
[email protected]
KESTRA_PASSWORD=Admin1234
Set up your environment
Create a virtual environment and install the Kestra Python SDK.kestrapy is the core package.
uv venv
source .venv/bin/activate
uv pip install kestrapy
uv pip install python-dotenv # Optional: for loading .env variables automatically
Tip: Using python-dotenv allows you to store credentials securely and load them automatically when your script runs.
Configure the client
Import and initialize the client with your Kestra credentials:
from kestrapy import Configuration, KestraClient
configuration = Configuration(
host="http://localhost:8080",
username="[email protected]",
password="Root!1234"
)
kestra_client = KestraClient(configuration)
Notes:
- Use
.envor environment variables for credentials (avoid hardcoding). - Configure either basic or token-based authentication.
- Reuse a single
KestraClientinstance throughout your application.
Create a flow
Use the following Python script to create a simple flow with a Sleep task.
This example uses the create_flow method.
def create_flow():
tenant = "main"
body = """
id: my_flow
namespace: my_namespace
tasks:
- id: hello
type: io.kestra.plugin.core.flow.Sleep
duration: PT1S
"""
created = kestra_client.flows.create_flow(tenant=tenant, body=body)
print(f"Flow created: {created.id}")
Notes:
bodymust be valid YAML for a Kestra flow.- If a flow with the same
id,namespace, andtenantalready exists, useupdate_flowinstead. - The response contains metadata for the created flow.
Update a flow
Use the following Python script to update an existing flow.
def update_flow():
tenant = "main"
body = """
id: my_flow
namespace: my_namespace
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "Updated message!"
"""
updated = kestra_client.flows.update_flow(
id="my_flow",
namespace="my_namespace",
tenant=tenant,
body=body
)
print(f"Flow updated: {updated.id}")
Notes:
- You must provide the same
id,namespace, andtenantas the target flow. - Updating requires sending the full YAML, including all inputs, tasks, and metadata.
- Invalid YAML or missing fields will return a
4xxerror.
Execute a flow
Execute flows programmatically using the create_execution method.
def create_execution():
tenant = "main"
execution = kestra_client.executions.create_execution(
tenant=tenant,
namespace="my_namespace",
flow_id="my_flow",
wait=True,
inputs={"input_id": "value"}
)
print(f"Execution started: {execution.id}")
Notes:
wait=Trueblocks the call until the execution completes. Usewait=Falsefor asynchronous runs.inputscorrespond to the flow’s defined input parameters.- The response includes execution details and the unique execution ID.
Follow an execution
You can stream live execution updates using the follow_execution method.
def follow_execution():
tenant = "main"
execution = kestra_client.executions.create_execution(
namespace="my_namespace",
id="my_flow",
wait=False,
tenant=tenant
)
for event in kestra_client.executions.follow_execution(
execution_id=execution.id,
tenant=tenant
):
print(event.state.current)
Notes:
- Use
follow_executionto monitor running flows in real-time. - The stream yields execution state updates (e.g., RUNNING, SUCCESS, FAILED).
- Use this method in CI/CD, CLI tools, or real-time dashboards.
Best practices
- Reuse your client: Initialize one
KestraClientper application and share it. - Avoid hardcoding credentials: Use
.envor environment variables. - Validate YAML before submission: Invalid syntax causes
422responses. - Automate your workflows: Combine
create_flowandcreate_executionfor full CI/CD automation.
Was this page helpful?