🚀 New! Kestra raises $3 million to grow Learn more

Flows are used to implement your workload. They define all the tasks you want to perform and the order in which they will be run.

You define a flow thanks to a declarative model in YAML.

A flow must have an identifier (id), a namespace, and a list of tasks.

A flow can also have inputs, listeners, error handlers, and triggers.

Flow sample

Here is a sample flow definition. It uses tasks available in Kestra core for testing purposes.

yaml
id: samples
namespace: io.kestra.tests
description: "Some flow **documentation** in *Markdown*"

labels:
  env: prd
  country: FR

inputs:
  - name: my-value
    type: STRING
    required: false
    defaults: "default value"
    description: This is a not required my-value

variables:
  first: "1"
  second: "{{vars.first}} > 2"

tasks:
  - id: date
    type: io.kestra.core.tasks.debugs.Return
    description: "Some tasks **documentation** in *Markdown*"
    format: "A log line content with a contextual date variable {{taskrun.startDate}}"

taskDefaults:
  - type: io.kestra.core.tasks.log.Log
    values:
      level: ERROR

Labels

You can add arbitrary labels to your flows to sort them on multiple dimensions. When you execute such flow, the labels will be propagated to the created execution. It is also possible to override and define new labels at flow execution start.

Task Defaults

You can also define taskDefaults inside your flow. This is a list of default task properties that will be applied to each task of a certain type inside your flow. Task defaults can be handy to avoid repeating the same value for a task property in case the same task type is used multiple times in the same flow.

Variables

You can set flow variables that will be accessible by each task using {{ vars.key }}. Flow variables is a map of key/value pairs.

List of tasks

The most important part of a flow is the list of tasks that will be run sequentially when the flow is executed.

Flow Properties

The following flow properties can be set.

FieldDescription
idThe flow identifier, must be unique inside a namespace.
namespaceEach flow lives in one namespace, this is useful for flow organization and is mandatory.
revisionThe flow version, handled internally by Kestra, and incremented for each modification. You should not manually set it.
descriptionThe description of the flow, more details here.
labelsThe list of labels which are string key/value pairs.
inputsThe list of inputs, more details here.
variablesThe list of variables (such as api key, table name, URL, etc) that can be reached inside tasks with {{ vars.name }}.
tasksThe list of tasks, all tasks will be run sequentially.
errorsThe list of error tasks, all listed tasks will be run sequentially only if there is an error on the current execution. More details here.
listenersThe list of listeners, more details here.
triggersThe list of triggers which are external events (such as date schedule or message presence in a broker, for example) that will launch this flow, more details here.
taskDefaultsThe list of default task values, this avoid repeating the same properties on each tasks.
taskDefaults.[].typeThe task type is a full qualified Java class name.
taskDefaults.[].forcedIf set to forced: true, the taskDefault will take precedence over properties defined in the task (default false).
taskDefaults.[].values.xxxThe task property that you want to be set as default.
disabledSet it to true to disable execution of the flow.
concurrencyUse it to define flow-level concurrency control. By default, flow execution concurrency is not limited

taskDefaults

You can add task defaults to avoid repeating task properties on multiple occurrences of the same task in a taskDefaults properties. For example:

yaml
id: api_python_sql
namespace: dev

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: https://dummyjson.com/products

  - id: hello
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    script: |
      print("Hello World!")

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")

  - id: sql_query
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

taskDefaults:
  - type: io.kestra.plugin.scripts.python.Script
    values:
      runner: DOCKER
      docker:
        image: python:slim
        pullPolicy: ALWAYS # NEVER to use a local image

Here, we avoid repeating Docker and Python configurations in each task by directly setting those within the taskDefaults property. This approach helps to streamline the configuration process and reduce the chances of errors caused by inconsistent settings across different tasks.

Note that when you move some required task attributes into the taskDefaults property, the code editor within the UI will complain that the required task argument is missing. The editor shows this message because taskDefaults are resolved at runtime and the editor is not aware of those default attributes until you run your flow. As long as taskDefaults contains the relevant arguments, you can save the flow and ignore the warning displayed in the editor.

taskDefaultsWarning

Document your flow

You can add documentation to flows, tasks, etc... to explain the goal of the current element.

For this, Kestra allows adding a description property where you can write documentation of the current element. The description must be written using the Markdown syntax.

You can add a description property on:

All markdown descriptions will be rendered in the UI.

description

Enable or Disable a Flow

By default, all flows are active and will execute whether or not a trigger has been set.

You have the option to disable a Flow, which is particularly useful when you wish to prevent its execution.

Enabling a previously disabled Flow will prompt it to execute any missed triggers from the period when it was disabled.

enable disable flow