Flowable Tasks
Control your orchestration logic.
Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.
Flowable Tasks don't run heavy operations — those are handled by workers.
Flowable Tasks are used for branching, grouping, running tasks in parallel, and more.
Flowable Tasks use expressions from the execution context to define the next tasks to run. For example, you can use the outputs of a previous task in a Switch
task to decide which task to run next.
Sequential
This task processes tasks one after another sequentially. It is used to group tasks.
id: sequential
namespace: company.team
tasks:
- id: sequential
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.id }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You can access the output of a sibling task using the syntax {{outputs.sibling.value}}
.
Parallel
This task processes tasks in parallel. It makes it convenient to process many tasks at once.
id: parallel
namespace: company.team
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.id }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You cannot access the output of a sibling task as tasks will be run in parallel.
Switch
This task processes a set of tasks conditionally depending on a contextual variable's value.
In the following example, an input will be used to decide which task to run next.
id: switch
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: decision
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.param }}"
cases:
true:
- id: is_true
type: io.kestra.plugin.core.log.Log
message: "This is true"
false:
- id: is_false
type: io.kestra.plugin.core.log.Log
message: "This is false"
If
This task processes a set of tasks conditionally depending on a condition.
The condition must coerce to a boolean. Boolean coercion allows 0, -0, null and '' to coerce to false, all other values to coerce to true.
The else
branch is optional.
In the following example, an input will be used to decide which task to run next.
id: if_condition
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{ inputs.param }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "This is true"
else:
- id: when_false
type: io.kestra.plugin.core.log.Log
message: "This is false"
EachSequential
This task will generate many tasks at runtime depending on the value of a variable. Each subtask will run after the others sequentially.
In the following example, the variable is static, but it can be generated from a previous task output and starts an arbitrary number of subtasks.
id: each_example
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You can access the output of a sibling task using the syntax {{ outputs.sibling[taskrun.value].value }}
.
EachParallel
This task is the same as EachSequential
, but each subtask will run in parallel.
id: each_parallel
namespace: company.team
tasks:
- id: 1_each
type: io.kestra.plugin.core.flow.EachParallel
value: ["value 1", "value 2", "value 3"]
tasks:
- id: 1_1
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "{{task.id}} > $(date +"%T.%N")"'
- 'sleep 1'
- id: 1_2
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "{{ task.id }} > $(date +"%T.%N")"'
- 'sleep 1'
- id: 2_end
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You cannot access the output of a sibling task as tasks will be run in parallel.
In the next example, we can create a file for each value in parallel, then process all files in the next task. Note how the inputFiles
property uses a jq
expression with a map
function to extract the paths of all files processed in parallel and pass them into the next task's working directory.
id: parallel_script
namespace: io.kestra.tests
tasks:
- id: each
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ range(1, 9) }}"
tasks:
- id: script
type: io.kestra.plugin.scripts.shell.Script
outputFiles:
- "out/*.txt"
script: |
mkdir out
echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt
- id: process_all_files
type: io.kestra.plugin.scripts.shell.Script
inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
script: |
ls -h out/
ForEachItem
This task allows you to iterate over a list of items and run a subflow for each item, or for each batch containing multiple items.
Syntax:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }}
inputs:
file: "{{ taskrun.items }}" # items of the batch
batch:
rows: 4
namespace: company.team
flowId: subflow
revision: 1 # optional (default: latest)
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
labels: # optional labels to pass to the subflow to be executed
key: value
This will execute the subflow company.team.subflow
for each batch of items.
To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE
type called file
that takes the URI of an internal storage file containing the batch of items.
The next example shows you how you can access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs
output.
id: for_each_item
namespace: company.team
tasks:
- id: generate
type: io.kestra.plugin.scripts.shell.Script
script: |
for i in $(seq 1 10); do echo "$i" >> data; done
outputFiles:
- data
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.generate.outputFiles.data }}"
batch:
rows: 4
wait: true
flowId: my_subflow
namespace: company.team
inputs:
value: "{{ taskrun.items }}"
- id: for_each_outputs
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflow
AllowFailure
This task will allow child tasks to fail. If any child task fails:
- The AllowFailure failed task will be marked as status
WARNING
. - All children's tasks inside the AllowFailure will be stopped immediately.
- The Execution will continue for all others tasks.
- At the end, the execution as a whole will also be marked as status
WARNING
.
In the following example:
allow_failure
will be labelled asWARNING
.ko
will be labelled asFAILED
.next
will not be run.end
will be run and labelledSUCCESS
.
id: each
namespace: company.team
tasks:
- id: allow_failure
type: io.kestra.plugin.core.flow.AllowFailure
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail
- id: next
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: end
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
Fail
This task will fail the flow; it can be used with or without conditions.
Without conditions, it can be used, for example, to fail on some switch value.
id: fail_on_switch
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.param }}"
cases:
case1:
- id: case1
type: io.kestra.plugin.core.log.Log
message: Case 1
case2:
- id: case2
type: io.kestra.plugin.core.log.Log
message: Case 2
notexist:
- id: fail
type: io.kestra.plugin.core.execution.Fail
default:
- id: default
type: io.kestra.plugin.core.log.Log
message: default
With conditions, it can be used, for example, to validate inputs.
id: fail_on_condition
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: before
type: io.kestra.plugin.core.log.Log
message: "I'm before the fail on condition"
- id: fail
type: io.kestra.plugin.core.execution.Fail
condition: "{{inputs.param == 'fail'}}"
- id: after
type: io.kestra.plugin.core.log.Log
message: "I'm after the fail on condition"
Subflow
This task will trigger another flow. This allows you to decouple the first flow from the second and monitor each flow individually.
You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).
id: subflow
namespace: company.team
tasks:
- id: "subflow"
type: io.kestra.plugin.core.flow.Subflow
namespace: company.team
flowId: my-subflow
inputs:
file: "{{ inputs.myFile }}"
store: 12
Worker
The Worker
task is deprecated in favor of theWorkingDirectory
task. The next section explains how you can use theWorkingDirectory
task in order to allow multiple tasks to share a file system during the flow's Execution.
WorkingDirectory
By default, Kestra will launch each task in a new working directory, possibly on different workers if multiple ones exist.
The example below will run all tasks nested under the WorkingDirectory
task sequentially. All those tasks will be executed in the same working directory, allowing the reuse of the previous tasks' output files in the downstream tasks. In order to share a working directory, all tasks nested under the WorkingDirectory
task will be launched on the same worker.
This task can be particularly useful for compute-intensive file system operations.
id: working_dir_flow
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: first
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
- id: second
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- |
echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'
This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules
of a node Script
task.
id: node_with_cache
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
cache:
patterns:
- node_modules/**
ttl: PT1H
tasks:
- id: script
type: io.kestra.plugin.scripts.node.Script
beforeCommands:
- npm install colors
script: |
const colors = require("colors");
console.log(colors.red("Hello"));
This task can also fetch files from namespace files and make them available to all child tasks.
id: node_with_cache
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
namespaceFiles:
enabled: true
include:
- dir1/*.*
exclude:
- dir2/*.*
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
commands:
- cat dir1/file1.txt
Pause
Kestra flows run until all tasks complete, but sometimes you need to:
- Add a manual validation before continuing the execution.
- Wait for some duration before continuing the execution.
For this, you can use the Pause task.
On the following example, the validation
will pause until a manual modification of the task step, and the wait
will wait for 5 minutes.
id: pause
namespace: company.team
tasks:
- id: validation
type: io.kestra.plugin.core.flow.Pause
tasks:
- id: ok
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "started after manual validation"'
- id: wait
type: io.kestra.plugin.core.flow.Pause
delay: PT5M
tasks:
- id: waited
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "start after 5 minutes"'
A Pause task without delay will wait indefinitely until the task state is changed to Running. For this: go to the Gantt tab of the Execution page, click on the task, select Change status on the contextual menu and select Mark as RUNNING on the form. This will make the task run until its end.
DAG
This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task allows you to only define upstream dependencies for each task using the dependsOn
property. This way, you can set dependencies more implicitly for each task, and Kestra will figure out the overall flow structure.
id: dag
namespace: company.team
tasks:
- id: dag
description: "my task"
type: io.kestra.plugin.core.flow.Dag
tasks:
- task:
id: task1
type: io.kestra.plugin.core.log.Log
message: I'm the task 1
- task:
id: task2
type: io.kestra.plugin.core.log.Log
message: I'm the task 2
dependsOn:
- task1
- task:
id: task3
type: io.kestra.plugin.core.log.Log
message: I'm the task 3
dependsOn:
- task1
- task:
id: task4
type: io.kestra.plugin.core.log.Log
message: I'm the task 4
dependsOn:
- task2
- task:
id: task5
type: io.kestra.plugin.core.log.Log
message: I'm the task 5
dependsOn:
- task4
- task3
Template (deprecated)
Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.
The following example uses the Template task to use a template.
id: template
namespace: company.team
tasks:
- id: template
type: io.kestra.plugin.core.flow.Template
namespace: company.team
templateId: template
Was this page helpful?