Run tasks or subflows in parallel, create loops and conditional branching.
Add conditional logic using Flowable tasks
In addition to running independent processes in parallel, you may want specific tasks to run only if certain conditions are met; otherwise, another action occurs.
For example, you can use the If task to specify your conditions and define what action to take based on whether those conditions are met.
The following flow is an example of the If task in action. Calling back to dummyjson, an API request is made based on the product category input of either beauty
or notebook
(one does not exist).
The if
task has a condition
of "{{ json(outputs.api.body).products | length > 0 }}"
(i.e., checking whether the API body is not empty and contains at least one product). The log message then depends on whether the actual product category exists or not.
id: getting_started
namespace: company.team
inputs:
- id: category
type: SELECT
displayName: Select a category
values: ['beauty', 'notebooks']
defaults: 'beauty'
tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: "https://dummyjson.com/products/category/{{inputs.category}}"
method: GET
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{ json(outputs.api.body).products | length > 0 }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "This category has products"
else:
- id: when_false
type: io.kestra.plugin.core.log.Log
message: "The category has no products."
Add a loop to a flow using Flowable tasks
It is a common process in orchestration to have a set of values you want to operate on. In Kestra, the ForEach flowable task executes a group of tasks for each value in the list. There are many ways to implement ForEach for complex looping operations, possibly incorporating conditional flowable tasks or subtasks. See more examples in the ForEach documentation.
As an introduction to the feature, the below example demonstrates using ForEach to make an API call to OpenLibrary to get a list of associated titles for each Author in the list. The values are defined as a JSON string or an array, i.e., a list of string values ["value1", "value2"]
or a list of key-value pairs [{"key": "value1"}, {"key": "value2"}]
.
You can access the current iteration value using the variable :
id: for_loop_example
namespace: tutorial
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: ["pynchon", "dostoyevsky", "hedayat"]
tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: "https://openlibrary.org/search.json?author={{ taskrun.value }}&sort=new"
After execution, the Gantt view shows separate runs for each of the three listed authors in the task.
Add parallelism using Flowable tasks
A common orchestration requirement is executing independent processes in parallel. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.
The flow below uses the ForEach
flowable task to execute a list of tasks
in parallel.
- The
concurrencyLimit
property with value0
makes the list oftasks
to execute in parallel. - The
values
property defines the list of items to iterate over. - The
tasks
property defines the list of tasks to execute for each item in the list. You can access the iteration value using the{{ taskrun.value }}
variable.
id: python_partitions
namespace: company.team
description: Process partitions in parallel
tasks:
- id: getPartitions
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
from kestra import Kestra
partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
Kestra.outputs({'partitions': partitions})
- id: processPartitions
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: '{{ outputs.getPartitions.vars.partitions }}'
tasks:
- id: partition
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import random
import time
from kestra import Kestra
filename = '{{ taskrun.value }}'
print(f"Reading and processing partition {filename}")
nr_rows = random.randint(1, 1000)
processing_time = random.randint(1, 20)
time.sleep(processing_time)
Kestra.counter('nr_rows', nr_rows, {'partition': filename})
Kestra.timer('processing_time', processing_time, {'partition': filename})
These examples, while simple, demonstrate the flexibility of flowable tasks in both simple and complex workflows.
To learn more about flowable tasks and see more examples, check out the full flowable tasks documentation.
Was this page helpful?