Flowable Tasks​Flowable ​Tasks

Run tasks or subflows in parallel, create loops and conditional branching.

Add conditional logic using Flowable tasks

In addition to running independent processes in parallel, very often you will want specific tasks to run only if certain conditions are met, else some other action happens. Conditional logic can be applied to inputs, outputs, variables, and many other components of your flow, even other flows.

For example, you can use the If task specify your conditions and what action to take whether those conditions are met or not.

The below flow is an example of the If task in action. Calling back to dummyjson, an API request is made based on the product category input of either beauty or notebook (one does not exist).

The if task has a condition of "{{ json(outputs.api.body).products | length > 0 }}" (i.e., checking to see if the API body is not empty, and there is a product). The log message then depends on whether the actual product category exists or not.

yaml
id: getting_started
namespace: company.team

inputs:
  - id: category
    type: SELECT
    displayName: Select a category
    values: ['beauty', 'notebooks']
    defaults: 'beauty'

tasks:
  - id: api
    type: io.kestra.plugin.core.http.Request
    uri: "https://dummyjson.com/products/category/{{inputs.category}}"
    method: GET

  - id: if
    type: io.kestra.plugin.core.flow.If
    condition: "{{ json(outputs.api.body).products | length > 0 }}"
    then:
      - id: when_true
        type: io.kestra.plugin.core.log.Log
        message: "This category has products"
    else:
      - id: when_false
        type: io.kestra.plugin.core.log.Log
        message: "The category has no products."

Add a loop to a flow using Flowable tasks

It is a common process in orchestration to have a set of values you want to operate on. In Kestra, the ForEach flowable task executes a group of tasks for each value in the list. There are many ways to implement ForEach to conduct complex looping operations possibly incorporating conditional flowable tasks or subtasks, and you can see more examples in the ForEach documentation.

As an introduction to the feature, the below example demonstrates using ForEach to make an API call to OpenLibrary to get a list of associated titles for each Author in the list. The values are defined as a JSON string or an array, i.e., a list of string values ["value1", "value2"] or a list of key-value pairs [{"key": "value1"}, {"key": "value2"}].

You can access the current iteration value using the variable :

yaml
id: for_loop_example
namespace: tutorial

tasks:
  - id: for_each
    type: io.kestra.plugin.core.flow.ForEach
    values: ["pynchon", "dostoyevsky", "hedayat"]
    tasks:
      - id: api
        type: io.kestra.plugin.core.http.Request
        uri: "https://openlibrary.org/search.json?author={{ taskrun.value }}&sort=new"

After executing, the Gantt view shows separate runs for each of the three listed authors in our task.

forEach example

Add parallelism using Flowable tasks

One of the most common orchestration requirements is to execute independent processes in parallel. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.

The flow below uses the ForEach flowable task to execute a list of tasks in parallel.

  1. The concurrencyLimit property with value 0 makes the list of tasks to execute in parallel.
  2. The values property defines the list of items to iterate over.
  3. The tasks property defines the list of tasks to execute for each item in the list. You can access the iteration value using the {{ taskrun.value }} variable.
yaml
id: python_partitions
namespace: company.team

description: Process partitions in parallel

tasks:
  - id: getPartitions
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      from kestra import Kestra
      partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
      Kestra.outputs({'partitions': partitions})

  - id: processPartitions
    type: io.kestra.plugin.core.flow.ForEach
    concurrencyLimit: 0
    values: '{{ outputs.getPartitions.vars.partitions }}'
    tasks:
      - id: partition
        type: io.kestra.plugin.scripts.python.Script
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/pydata:latest
        script: |
          import random
          import time
          from kestra import Kestra

          filename = '{{ taskrun.value }}'
          print(f"Reading and processing partition {filename}")
          nr_rows = random.randint(1, 1000)
          processing_time = random.randint(1, 20)
          time.sleep(processing_time)
          Kestra.counter('nr_rows', nr_rows, {'partition': filename})
          Kestra.timer('processing_time', processing_time, {'partition': filename})

While simple, the examples demonstrate the flexibility provided by flowable tasks in both simple and complex workflows. To learn more about flowable tasks and see more examples, check out the full flowable tasks documentation.

Was this page helpful?