Flowable Tasks in Kestra – Control Orchestration Logic
Control your orchestration logic.
Control orchestration with flowable tasks
Flowable tasks control orchestration logic — running tasks or subflows in parallel, creating loops, and handling conditional branching. They do not run heavy operations; those are handled by workers.
Flowable tasks use expressions from the execution context to determine which tasks run next. For example, you can use the outputs of a previous task in a Switch task to decide which task to run next.
Sequential
This task runs tasks sequentially and is typically used to group them.
id: sequentialnamespace: company.team
tasks: - id: sequential type: io.kestra.plugin.core.flow.Sequential tasks: - id: 1st type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.id }}"
- id: last type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"You can access the output of a sibling task using the syntax {{ outputs.sibling.value }}.
For more details on capabilities, check out the Sequential Task documentation.
Parallel
This task runs tasks in parallel, making it convenient to process many tasks simultaneously.
id: parallelnamespace: company.team
tasks: - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: - id: 1st type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.id }}"
- id: last type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"You cannot access the output of a sibling task as tasks will be run in parallel.
For more task details, refer to the Parallel Task documentation.
Switch
This task conditionally runs tasks based on the value of a contextual variable.
In the following example, an input is used to decide which task to run next.
id: switchnamespace: company.team
inputs: - id: param type: BOOLEAN
tasks: - id: decision type: io.kestra.plugin.core.flow.Switch value: "{{ inputs.param }}" cases: true: - id: is_true type: io.kestra.plugin.core.log.Log message: "This is true" false: - id: is_false type: io.kestra.plugin.core.log.Log message: "This is false"For more plugin details, refer to the Switch Task documentation.
If
This task processes a set of tasks conditionally depending on a condition.
The condition must evaluate to a boolean. Values such as 0, -0, null, and '' evaluate to false; all other values evaluate to true.
The else branch is optional.
In the following example, an input is used to decide which task to run next.
id: if_conditionnamespace: company.team
inputs: - id: param type: BOOLEAN
tasks: - id: if type: io.kestra.plugin.core.flow.If condition: "{{ inputs.param }}" then: - id: when_true type: io.kestra.plugin.core.log.Log message: "This is true" else: - id: when_false type: io.kestra.plugin.core.log.Log message: "This is false"For more details, check out the If Task documentation.
ForEach
This task executes a group of tasks for each value in the list.
In the following example, the variable is static, but it could also be generated from a previous task output, starting any number of subtasks.
id: foreach_examplenamespace: company.team
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: ["value 1", "value 2", "value 3"] tasks: - id: before_if type: io.kestra.plugin.core.debug.Return format: "Before if {{ taskrun.value }}" - id: if type: io.kestra.plugin.core.flow.If condition: '{{ taskrun.value == "value 2" }}' then: - id: after_if type: io.kestra.plugin.core.debug.Return format: "After if {{ parent.taskrun.value }}"In this execution, you can access:
- The iteration value i.e., the index of a loop (the loop index starts at 0) using the syntax
{{ taskrun.iteration }} - The output of a sibling task using the syntax
{{ outputs.sibling[taskrun.value].value }}
This example shows how to run tasks in parallel for each value in the list. All child tasks of the parallel task run in parallel. However, due to the concurrencyLimit property set to 2, only two parallel task groups run at any given time.
id: parallel_tasks_examplenamespace: company.team
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] concurrencyLimit: 2 tasks: - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: - id: log type: io.kestra.plugin.core.log.Log message: Processing {{ parent.taskrun.value }} - id: shell type: io.kestra.plugin.scripts.shell.Commands commands: - sleep {{ parent.taskrun.value }}For more information on handling outputs generated from ForEach, check out this dedicated loop how-to guide.
For processing items, or forwarding processing to a subflow, ForEachItem is better suited.
For more details, refer to the ForEach Task documentation.
ForEachItem
This task iterates over a list of items and runs a subflow for each item, or for each batch of items.
- id: each type: io.kestra.plugin.core.flow.ForEachItem items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }} inputs: file: "{{ taskrun.items }}" # items of the batch batch: rows: 4 namespace: company.team flowId: subflow revision: 1 # optional (default: latest) wait: true # wait for the subflow execution transmitFailed: true # fail the task run if the subflow execution fails labels: # optional labels to pass to the subflow to be executed key: valueThis executes the subflow company.team.subflow for each batch of items.
To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE type called file that takes the URI of an internal storage file containing the batch of items.
The next example shows you how to access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs output.
id: for_each_itemnamespace: company.team
tasks: - id: generate type: io.kestra.plugin.scripts.shell.Script script: | for i in $(seq 1 10); do echo "$i" >> data; done outputFiles: - data
- id: for_each_item type: io.kestra.plugin.core.flow.ForEachItem items: "{{ outputs.generate.outputFiles.data }}" batch: rows: 4 wait: true flowId: my_subflow namespace: company.team inputs: value: "{{ taskrun.items }}"
- id: for_each_outputs type: io.kestra.plugin.core.log.Log message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflowFor more details, refer to the ForEachItem Task documentation.
ForEach vs ForEachItem
Both ForEach and ForEachItem are similar, but there are specific use cases that suit one over the other:
ForEachgenerates a lot of Task Runs which can impact performance.ForEachItemgenerates separate executions using Subflows for the group of tasks. This scales better for larger datasets.
Read more about performance optimization in our best practices guides.
LoopUntil
LoopUntil runs a group of tasks repeatedly until a boolean condition evaluates to true. After each iteration, the task evaluates the condition expression; if it evaluates to false, the block is executed again after the configured interval.
Typical use cases include polling an external API, waiting for a long-running job to transition to a terminal state, or checking for the presence of downstream resources.
Key properties:
condition— expression evaluated after each iteration; has access to the child task outputs from the most recent run (e.g.{{ outputs.checkStatus.code }}).tasks— the list of child tasks to run before re-evaluating the condition.checkFrequency— optional guardrails that defineinterval,maxIterations, and/ormaxDurationbetween repeats. (See the LoopUntil migration note for default values.)
Example: poll an API until it returns HTTP 200, checking every 30 seconds and stopping after 50 attempts if it never succeeds.
id: loop_untilnamespace: company.team
tasks: - id: loop type: io.kestra.plugin.core.flow.LoopUntil condition: "{{ outputs.ping.code == 200 }}" checkFrequency: interval: PT30S maxIterations: 50 tasks: - id: ping type: io.kestra.plugin.core.http.Request method: GET uri: https://kestra.io/api/mockFor more details, refer to the LoopUntil Task documentation.
AllowFailure
This task allows child tasks to fail.
If any child task fails:
- The
AllowFailuretask is marked with statusWARNING. - All child tasks inside
AllowFailurestop immediately. - The execution continues for all other tasks.
- At the end, the execution as a whole is marked as status
WARNING.
In the following example:
allow_failurewill be labelled asWARNING.kowill be labelled asFAILED.nextwill not be run.endwill be run and labelledSUCCESS.
id: eachnamespace: company.team
tasks: - id: allow_failure type: io.kestra.plugin.core.flow.AllowFailure tasks: - id: ko type: io.kestra.plugin.core.execution.Fail - id: next type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: end type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"For more details, refer to the AllowFailure Task documentation.
Fail
This task fails the flow; it can be used with or without conditions.
Without conditions, it can be used, for example, to fail on some switch value.
id: fail_on_switchnamespace: company.team
inputs: - id: param type: STRING required: true
tasks: - id: switch type: io.kestra.plugin.core.flow.Switch value: "{{ inputs.param }}" cases: case1: - id: case1 type: io.kestra.plugin.core.log.Log message: Case 1 case2: - id: case2 type: io.kestra.plugin.core.log.Log message: Case 2 notexist: - id: fail type: io.kestra.plugin.core.execution.Fail default: - id: default type: io.kestra.plugin.core.log.Log message: defaultWith conditions, it can be used, for example, to validate inputs.
id: fail_on_conditionnamespace: company.team
inputs: - id: param type: STRING required: true
tasks: - id: before type: io.kestra.plugin.core.log.Log message: "I'm before the fail on condition" - id: fail type: io.kestra.plugin.core.execution.Fail condition: "{{ inputs.param == 'fail' }}" - id: after type: io.kestra.plugin.core.log.Log message: "I'm after the fail on condition"For more information, refer to the Fail Task documentation.
Subflow
This task triggers another flow. This enables you to decouple the first flow from the second and monitor each flow individually.
You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).
id: subflownamespace: company.team
tasks: - id: "subflow" type: io.kestra.plugin.core.flow.Subflow namespace: company.team flowId: my-subflow inputs: file: "{{ inputs.myFile }}" store: 12For more details, refer to the Subflow Task documentation.
WorkingDirectory
By default, Kestra launches each task in a new working directory, possibly on different workers if multiple ones exist.
The example below runs all tasks nested under the WorkingDirectory task sequentially in the same directory, allowing downstream tasks to reuse output files from previous ones. In order to share a working directory, all tasks nested under the WorkingDirectory task are launched on the same worker.
This task can be particularly useful for compute-intensive file system operations.
id: working_dir_flownamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory tasks: - id: first type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
- id: second type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - | echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules of a node Script task.
id: node_with_cachenamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory cache: patterns: - node_modules/** ttl: PT1H tasks: - id: script type: io.kestra.plugin.scripts.node.Script beforeCommands: - npm install colors script: | const colors = require("colors"); console.log(colors.red("Hello"));This task can also fetch files from namespace files and make them available to all child tasks.
id: node_with_cachenamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory namespaceFiles: enabled: true include: - dir1/*.* exclude: - dir2/*.* tasks: - id: shell type: io.kestra.plugin.scripts.shell.Commands commands: - cat dir1/file1.txtPause
Kestra flows run until all tasks complete, but sometimes you need to:
- Add a manual validation before continuing the execution
- Wait for some duration before continuing the execution
For this, you can use the Pause task.
In the following example, the validation task pauses until it is manually resumed, while the wait task pauses for 5 minutes.
id: pausenamespace: company.team
tasks: - id: validation type: io.kestra.plugin.core.flow.Pause tasks: - id: ok type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "started after manual validation"'
- id: wait type: io.kestra.plugin.core.flow.Pause delay: PT5M tasks: - id: waited type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "start after 5 minutes"'A Pause task without delay waits indefinitely until the task state is changed to Running. For this: go to the Gantt tab of the Execution page, click on the task, select Change status on the contextual menu, and select Mark as RUNNING on the form. This makes the task run until its end. For more details, refer to the Pause Task documentation.
DAG
This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task defines dependencies for each task using the dependsOn property. This way, you can set dependencies more implicitly for each task, and Kestra figures out the overall flow structure.
id: dagnamespace: company.teamtasks: - id: dag description: "my task" type: io.kestra.plugin.core.flow.Dag tasks: - task: id: task1 type: io.kestra.plugin.core.log.Log message: I'm the task 1 - task: id: task2 type: io.kestra.plugin.core.log.Log message: I'm the task 2 dependsOn: - task1 - task: id: task3 type: io.kestra.plugin.core.log.Log message: I'm the task 3 dependsOn: - task1 - task: id: task4 type: io.kestra.plugin.core.log.Log message: I'm the task 4 dependsOn: - task2 - task: id: task5 type: io.kestra.plugin.core.log.Log message: I'm the task 5 dependsOn: - task4 - task3For more details, refer to the Dag Task documentation.
Template (deprecated)
Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.
The following example uses the Template task to use a template.
id: templatenamespace: company.team
tasks: - id: template type: io.kestra.plugin.core.flow.Template namespace: company.team templateId: templateWas this page helpful?