Flowable Tasks​Flowable ​Tasks

Control your orchestration logic.

Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.

Flowable Tasks don't run heavy operations — those are handled by workers.

Flowable Tasks are used for branching, grouping, running tasks in parallel, and more.

Flowable Tasks use expressions from the execution context to define the next tasks to run. For example, you can use the outputs of a previous task in a Switch task to decide which task to run next.

Sequential

This task processes tasks one after another sequentially. It is used to group tasks.

yaml
id: sequential
namespace: company.team

tasks:
  - id: sequential
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"

      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.id}}"

  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

Parallel

This task processes tasks in parallel. It makes it convenient to process many tasks at once.

yaml
id: parallel
namespace: company.team

tasks:
  - id: parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"

      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.id}}"

  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

Switch

This task processes a set of tasks conditionally depending on a contextual variable's value.

In the following example, an input will be used to decide which task to run next.

yaml
id: switch
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: decision
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.param }}"
    cases:
      true:
        - id: is_true
          type: io.kestra.plugin.core.log.Log
          message: "This is true"
      false:
        - id: is_false
          type: io.kestra.plugin.core.log.Log
          message: "This is false"

If

This task processes a set of tasks conditionally depending on a condition. The condition must coerce to a boolean. Boolean coercion allows 0, -0, null and '' to coerce to false, all other values to coerce to true. The else branch is optional.

In the following example, an input will be used to decide which task to run next.

yaml
id: if-condition
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: if
    type: io.kestra.plugin.core.flow.If
    condition: "{{ inputs.param }}"
    then:
      - id: when-true
        type: io.kestra.plugin.core.log.Log
        message: "This is true"
    else:
      - id: when-false
        type: io.kestra.plugin.core.log.Log
        message: "This is false"

EachSequential

This task will generate many tasks at runtime depending on the value of a variable. Each subtask will run after the others sequentially.

In the following example, the variable is static, but it can be generated from a previous task output and starts an arbitrary number of subtasks.

yaml
id: each_example
namespace: company.team

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.EachSequential
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"

      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"

  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

EachParallel

This task is the same as EachSequential, but each subtask will run in parallel.

yaml
id: each-parallel
namespace: company.team

tasks:
  - id: 1_each
    type: io.kestra.plugin.core.flow.EachParallel
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: 1-1
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "{{task.id}} > $(date +"%T.%N")"'
          - 'sleep 1'
          -

      - id: 1-2
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "{{task.id}} > $(date +"%T.%N")"'
          - 'sleep 1'

  - id: 2_end
    type: io.kestra.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

In the next example, we can create a file for each value in parallel, then process all files in the next task. Note how the inputFiles property uses a jq expression with a map function to extract the paths of all files processed in parallel and pass them into the next task's working directory.

yaml
id: parallel_script
namespace: io.kestra.tests

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ range(1, 9) }}"
    tasks:
      - id: script
        type: io.kestra.plugin.scripts.shell.Script
        outputFiles:
          - "out/*.txt"
        script: |
          mkdir out 
          echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt

  - id: process_all_files 
    type: io.kestra.plugin.scripts.shell.Script
    inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
    script: | 
      ls -h out/

ForEachItem

This task allows you to iterate over a list of items and run a subflow for each item, or for each batch containing multiple items.

Syntax:

yaml
  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }}
    inputs:
      file: "{{ taskrun.items }}" # items of the batch
    batch:
      rows: 4
    namespace: company.team
    flowId: subflow
    revision: 1 # optional (default: latest)
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    labels: # optional labels to pass to the subflow to be executed
      key: value

This will execute the subflow company.team.subflow for each batch of items. To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE type called file that takes the URI of an internal storage file containing the batch of items.

The next example shows you how you can access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs output.

yaml
id: ForEachItem
namespace: company.team

tasks:
  - id: generate
    type: io.kestra.plugin.scripts.shell.Script
    script: |
      for i in $(seq 1 10); do echo "$i" >> data; done
    outputFiles:
      - data

  - id: forEachItem
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.generate.outputFiles.data }}"
    batch:
      rows: 4
    wait: true  
    flowId: my-subflow
    namespace: company.team
    inputs:
       value: "{{ taskrun.items }}"

  - id: forEachOutputs
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflow

AllowFailure

This task will allow child tasks to fail. If any child task fails:

  • The AllowFailure failed task will be marked as status WARNING.
  • All children's tasks inside the AllowFailure will be stopped immediately.
  • The Execution will continue for all others tasks.
  • At the end, the execution as a whole will also be marked as status WARNING.

In the following example:

  • allow-failure will be labelled as WARNING.
  • ko will be labelled as FAILED.
  • next will not be run.
  • end will be run and labelled SUCCESS.
yaml
id: each
namespace: company.team

tasks:
  - id: allow-failure
    type: io.kestra.plugin.core.flow.AllowFailure
    tasks:
      - id: ko
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - exit 1
      - id: next
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"

  - id: end
    type: io.kestra.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

Fail

This task will fail the flow; it can be used with or without conditions.

Without conditions, it can be used, for example, to fail on some switch value.

yaml
id: fail-on-switch
namespace: company.team

inputs:
  - id: param
    type: STRING
    required: true

tasks:
  - id: switch
    type: io.kestra.plugin.core.flow.Switch
    value: "{{inputs.param}}"
    cases:
      case1:
        - id: case1
          type: io.kestra.plugin.core.log.Log
          message: Case 1
      case2:
        - id: case2
          type: io.kestra.plugin.core.log.Log
          message: Case 2
      notexist:
        - id: fail
          type: io.kestra.plugin.core.execution.Fail
      default:
        - id: default
          type: io.kestra.plugin.core.log.Log
          message: default

With conditions, it can be used, for example, to validate inputs.

yaml
id: fail-on-condition
namespace: company.team

inputs:
  - id: param
    type: STRING
    required: true

tasks:
  - id: before
    type: io.kestra.plugin.core.log.Log
    message: "I'm before the fail on condition"
  - id: fail
    type: io.kestra.plugin.core.execution.Fail
    condition: "{{inputs.param == 'fail'}}"
  - id: after
    type: io.kestra.plugin.core.log.Log
    message: "I'm after the fail on condition"

Subflow

This task will trigger another flow. This allows you to decouple the first flow from the second and monitor each flow individually.

You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).

yaml
id: subflow
namespace: company.team

tasks:
  - id: "subflow"
    type: io.kestra.plugin.core.flow.Subflow
    namespace: company.team
    flowId: my-subflow
    inputs:
      file: "{{ inputs.myFile }}"
      store: 12

Worker

The Worker task is deprecated in favor of theWorkingDirectory task. The next section explains how you can use theWorkingDirectory task in order to allow multiple tasks to share a file system during the flow's Execution.

WorkingDirectory

By default, Kestra will launch each task in a new working directory, possibly on different workers if multiple ones exist.

The example below will run all tasks nested under the WorkingDirectory task sequentially. All those tasks will be executed in the same working directory, allowing the reuse of the previous tasks' output files in the downstream tasks. In order to share a working directory, all tasks nested under the WorkingDirectory task will be launched on the same worker.

This task can be particularly useful for compute-intensive file system operations.

yaml
id: working-dir
namespace: company.team

tasks:
  - id: working-dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: first
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'

      - id: second
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - |
            echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'

This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules of a node Script task.

yaml
id: node-with-cache
namespace: company.team

tasks:
  - id: working-dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    cache:
      patterns:
        - node_modules/**
      ttl: PT1H
    tasks:
    - id: script
      type: io.kestra.plugin.scripts.node.Script
      beforeCommands:
        - npm install colors
      script: |
        const colors = require("colors");
        console.log(colors.red("Hello"));

This task can also fetch files from namespace files and make them available to all child tasks.

yaml
id: node-with-cache
namespace: company.team

tasks:
  - id: working-dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    namespaceFiles:
      enabled: true
      include:
      - dir1/*.*
      exclude:
      - dir2/*.*
    tasks:
    - id: shell
      type: io.kestra.plugin.scripts.shell.Commands
      commands:
      - cat dir1/file1.txt

Pause

Kestra flows run until all tasks complete, but sometimes you need to:

  • Add a manual validation before continuing the execution.
  • Wait for some duration before continuing the execution.

For this, you can use the Pause task.

On the following example, the validation will pause until a manual modification of the task step, and the wait will wait for 5 minutes.

yaml
id: pause
namespace: company.team

tasks:
  - id: validation
    type: io.kestra.plugin.core.flow.Pause
    tasks:
      - id: ok
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "started after manual validation"'

  - id: wait
    type: io.kestra.plugin.core.flow.Pause
    delay: PT5M
    tasks:
      - id: waited
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "start after 5 minutes"'

DAG

This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task allows you to only define upstream dependencies for each task using the dependsOn property. This way, you can set dependencies more implicitly for each task, and Kestra will figure out the overall flow structure.

yaml
id: dag
namespace: company.team
tasks:
  - id: dag
    description: "my task"
    type: io.kestra.plugin.core.flow.Dag
    tasks:
      - task:
          id: task1
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 1
      - task:
          id: task2
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 2
        dependsOn:
          - task1
      - task:
          id: task3
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 3
        dependsOn:
          - task1
      - task:
          id: task4
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 4
        dependsOn:
          - task2
      - task:
          id: task5
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 5
        dependsOn:
          - task4
          - task3

Template (deprecated)

Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.

The following example uses the Template task to use a template.

yaml
id: template
namespace: company.team

tasks:
  - id: template
    type: io.kestra.plugin.core.flow.Template
    namespace: company.team
    templateId: template

Was this page helpful?