Flows
Flows are used to implement your workload. They define all the tasks you want to perform and the order in which they will be run.
You define a flow thanks to a declarative model in YAML.
A flow must have an identifier (id), a namespace, and a list of tasks.
A flow can also have inputs, listeners, error handlers, and triggers.
Flow sample
Here is a sample flow definition. It uses tasks available in Kestra core for testing purposes.
id: samples
namespace: io.kestra.tests
description: "Some flow **documentation** in *Markdown*"
labels:
env: prd
country: FR
inputs:
- name: my-value
type: STRING
required: false
defaults: "default value"
description: This is a not required my-value
variables:
first: "1"
second: "{{vars.first}} > 2"
tasks:
- id: date
type: io.kestra.core.tasks.debugs.Return
description: "Some tasks **documentation** in *Markdown*"
format: "A log line content with a contextual date variable {{taskrun.startDate}}"
taskDefaults:
- type: io.kestra.core.tasks.log.Log
values:
level: ERROR
Labels
You can add arbitrary labels
to your flows to sort them on multiple dimensions. When you execute such flow, the labels will be propagated to the created execution. It is also possible to override and define new labels at flow execution start.
Task Defaults
You can also define taskDefaults
inside your flow. This is a list of default task properties that will be applied to each task of a certain type inside your flow. Task defaults can be handy to avoid repeating the same value for a task property in case the same task type is used multiple times in the same flow.
Variables
You can set flow variables that will be accessible by each task using {{ vars.key }}
. Flow variables
is a map of key/value pairs.
List of tasks
The most important part of a flow is the list of tasks that will be run sequentially when the flow is executed.
Flow Properties
The following flow properties can be set.
Field | Description |
---|---|
id | The flow identifier, must be unique inside a namespace. |
namespace | Each flow lives in one namespace, this is useful for flow organization and is mandatory. |
revision | The flow version, handled internally by Kestra, and incremented for each modification. You should not manually set it. |
description | The description of the flow, more details here. |
labels | The list of labels which are string key/value pairs. |
inputs | The list of inputs, more details here. |
variables | The list of variables (such as api key, table name, URL, etc) that can be reached inside tasks with {{ vars.name }} . |
tasks | The list of tasks, all tasks will be run sequentially. |
errors | The list of error tasks, all listed tasks will be run sequentially only if there is an error on the current execution. More details here. |
listeners | The list of listeners, more details here. |
triggers | The list of triggers which are external events (such as date schedule or message presence in a broker, for example) that will launch this flow, more details here. |
taskDefaults | The list of default task values, this avoid repeating the same properties on each tasks. |
taskDefaults.[].type | The task type is a full qualified Java class name. |
taskDefaults.[].forced | If set to forced: true , the taskDefault will take precedence over properties defined in the task (default false ). |
taskDefaults.[].values.xxx | The task property that you want to be set as default. |
disabled | Set it to true to disable execution of the flow. |
concurrency | Use it to define flow-level concurrency control. By default, flow execution concurrency is not limited |
taskDefaults
You can add task defaults to avoid repeating task properties on multiple occurrences of the same task in a taskDefaults
properties. For example:
id: api_python_sql
namespace: dev
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: https://dummyjson.com/products
- id: hello
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:slim
script: |
print("Hello World!")
- id: python
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:slim
beforeCommands:
- pip install polars
warningOnStdErr: false
script: |
import polars as pl
data = {{outputs.api.body | jq('.products') | first}}
df = pl.from_dicts(data)
df.glimpse()
df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")
- id: sql_query
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
sql: |
SELECT brand, round(avg(price), 2) as avg_price
FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
GROUP BY brand
ORDER BY avg_price DESC;
store: true
taskDefaults:
- type: io.kestra.plugin.scripts.python.Script
values:
runner: DOCKER
docker:
image: python:slim
pullPolicy: ALWAYS # NEVER to use a local image
Here, we avoid repeating Docker and Python configurations in each task by directly setting those within the taskDefaults
property. This approach helps to streamline the configuration process and reduce the chances of errors caused by inconsistent settings across different tasks.
Note that when you move some required task attributes into the taskDefaults
property, the code editor within the UI will complain that the required task argument is missing. The editor shows this message because taskDefaults
are resolved at runtime and the editor is not aware of those default attributes until you run your flow. As long as taskDefaults
contains the relevant arguments, you can save the flow and ignore the warning displayed in the editor.
Document your flow
You can add documentation to flows, tasks, etc... to explain the goal of the current element.
For this, Kestra allows adding a description
property where you can write documentation of the current element.
The description must be written using the Markdown syntax.
You can add a description
property on:
All markdown descriptions will be rendered in the UI.
Enable or Disable a Flow
By default, all flows are active and will execute whether or not a trigger has been set.
You have the option to disable a Flow, which is particularly useful when you wish to prevent its execution.
Enabling a previously disabled Flow will prompt it to execute any missed triggers from the period when it was disabled.