Set Up the Kestra Python SDK and Execute Workflows

For the complete documentation index, see llms.txt. For a full content snapshot, see llms-full.txt. Append .md to any kestra.io/docs/* URL for plain Markdown.

Use the Kestra Python SDK (kestrapy) to interact with the Kestra API from Python applications.

Install the Python SDK

Before starting, make sure your Kestra instance is running. Store credentials in an .env file:

KESTRA_HOST=http://localhost:8080
KESTRA_USERNAME=root@root.com
KESTRA_PASSWORD=Root!1234

Create a virtual environment and install the Kestra Python SDK.

uv venv
source .venv/bin/activate
uv pip install kestrapy
uv pip install python-dotenv # optional: loads .env automatically

Configure the client

Import and initialize the client with your Kestra credentials. Construct KestraClient once and reuse it throughout your application.

from kestrapy import Configuration, KestraClient
configuration = Configuration(
host="http://localhost:8080",
username="root@root.com",
password="Root!1234"
)
kestra_client = KestraClient(configuration)

Create a flow

Pass the flow definition as a YAML string to create_flow.

def create_flow():
tenant = "main"
body = """
id: my_flow
namespace: my_namespace
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "Hello World!"
"""
created = kestra_client.flows.create_flow(tenant=tenant, body=body)
print(f"Flow created: {created.id}")

Update a flow

Send the full YAML — including the same id and namespace — to replace an existing flow.

def update_flow():
tenant = "main"
body = """
id: my_flow
namespace: my_namespace
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "Updated message!"
"""
updated = kestra_client.flows.update_flow(
id="my_flow",
namespace="my_namespace",
tenant=tenant,
body=body
)
print(f"Flow updated: {updated.id}")

Delete a flow

Remove a flow by its namespace, id, and tenant.

def delete_flow():
tenant = "main"
kestra_client.flows.delete_flow(
namespace="my_namespace",
id="my_flow",
tenant=tenant
)
print("Flow deleted")

Execute a flow

Trigger an execution using create_execution.

The first three positional arguments are namespace, id (the flow ID), and wait.

def create_execution():
tenant = "main"
execution = kestra_client.executions.create_execution(
namespace="my_namespace",
id="my_flow",
wait=True,
tenant=tenant
)
print(f"Execution started: {execution.id}")

To pass inputs, use additional_form_datas with a dictionary keyed by input ID:

def create_execution_with_inputs():
tenant = "main"
execution = kestra_client.executions.create_execution(
namespace="my_namespace",
id="my_flow",
wait=True,
tenant=tenant,
additional_form_datas={"input_id": "value"}
)
print(f"Execution started: {execution.id}")

Follow an execution

Stream live execution updates using follow_execution.

def follow_execution():
tenant = "main"
execution = kestra_client.executions.create_execution(
namespace="my_namespace",
id="my_flow",
wait=False,
tenant=tenant
)
for event in kestra_client.executions.follow_execution(
execution_id=execution.id,
tenant=tenant
):
print(event.state.current)

KV Store

The KV Store lets you read and write key-value pairs scoped to a namespace.

List keys

Use list_all_keys to get a paged list of keys across the tenant:

def list_kv_keys():
tenant = "main"
result = kestra_client.kv.list_all_keys(
page=1,
size=50,
tenant=tenant
)
for entry in result.results:
print(f"Key: {entry.key}")

Get a value

def get_kv_value():
tenant = "main"
result = kestra_client.kv.key_value(
namespace="my_namespace",
key="my_key",
tenant=tenant
)
print(f"Value: {result.value}")

Set a value

def set_kv_value():
tenant = "main"
kestra_client.kv.set_key_value(
namespace="my_namespace",
key="my_key",
tenant=tenant,
body="my_value"
)
print("Key set")

Delete a key

def delete_kv_key():
tenant = "main"
kestra_client.kv.delete_key_value(
namespace="my_namespace",
key="my_key",
tenant=tenant
)
print("Key deleted")

Manage triggers

Search, enable or disable, unlock, and restart triggers for flows.

Search triggers

search_triggers is paginated and requires page and size:

def search_triggers():
tenant = "main"
result = kestra_client.triggers.search_triggers(
page=1,
size=50,
tenant=tenant
)
for t in result.results:
print(f"{t.trigger_context.trigger_id}: disabled={t.trigger_context.disabled}")

Disable or enable a trigger

import datetime
from kestrapy.models import TriggerControllerSetDisabledRequest, Trigger
def disable_trigger():
tenant = "main"
request = TriggerControllerSetDisabledRequest(
triggers=[
Trigger(
namespace="my_namespace",
flow_id="my_flow",
trigger_id="my_schedule",
var_date=datetime.datetime.now(datetime.timezone.utc)
)
],
disabled=True # pass False to re-enable
)
kestra_client.triggers.disabled_triggers_by_ids(
tenant=tenant,
trigger_controller_set_disabled_request=request
)
print("Trigger disabled")

Unlock a trigger

Use unlock_trigger to unlock a trigger that is stuck in a locked state:

def unlock_trigger():
tenant = "main"
kestra_client.triggers.unlock_trigger(
namespace="my_namespace",
flow_id="my_flow",
trigger_id="my_schedule",
tenant=tenant
)
print("Trigger unlocked")

Restart a trigger

def restart_trigger():
tenant = "main"
kestra_client.triggers.restart_trigger(
namespace="my_namespace",
flow_id="my_flow",
trigger_id="my_schedule",
tenant=tenant
)
print("Trigger restarted")

Best practices

  • Reuse your client: construct one KestraClient per application and share it.
  • Avoid hardcoding credentials: use environment variables or a secrets manager.
  • Validate YAML before submission: invalid syntax causes 422 responses.
  • Combine create_flow and create_execution for end-to-end CI/CD automation.

Was this page helpful?