EachParallel
EachParallel
⚠ This feature is deprecated and will be removed in the future. We encourage you to migrate to the new plugin, but you can still use the deprecated version.
type: "io.kestra.plugin.core.flow.EachParallel"
For each value in the list, execute one or more tasks in parallel.
This task is deprecated, please use the io.kestra.plugin.core.flow.ForEach
task instead.
The list of tasks
will be executed for each item in parallel. The value must be a valid JSON string representing an array, e.g. a list of strings ["value1", "value2"]
or a list of dictionaries [{"key": "value1"}, {"key": "value2"}]
.
You can access the current iteration value using the variable {{ taskrun.value }}
.
The task list will be executed in parallel for each item. For example, if you have a list with 3 elements and 2 tasks defined in the list of tasks
, all 6 tasks will be computed in parallel without any order guarantee.
If you want to execute a group of sequential tasks for each value in parallel, you can wrap the list of tasks
with the Sequential task.
If your list of values is large, you can limit the number of concurrent tasks using the concurrent
property.
We highly recommend triggering a subflow for each value (e.g. using the ForEachItem task) instead of specifying many tasks wrapped in a Sequential
task. This allows better scalability and modularity. Check the flow best practices documentation for more details.
Examples
id: each_parallel
namespace: company.team
tasks:
- id: each_parallel
type: io.kestra.plugin.core.flow.EachParallel
value: '["value 1", "value 2", "value 3"]'
tasks:
- id: each_value
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} with current value '{{ taskrun.value }}'"
Create a file for each value in parallel, then process all files in the next task. Note how the
inputFiles
property uses ajq
expression with amap
function to extract the paths of all files processed in parallel and pass them into the next task's working directory.
id: parallel_script
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ range(1, 9) }}"
tasks:
- id: script
type: io.kestra.plugin.scripts.shell.Script
outputFiles:
- "out/*.txt"
script: |
mkdir out
echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt
- id: process_all_files
type: io.kestra.plugin.scripts.shell.Script
inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
script: |
ls -h out/
Run a group of tasks for each value in parallel.
id: parallel_task_groups
namespace: company.team
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.EachParallel
value: ["value 1", "value 2", "value 3"]
tasks:
- id: group
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: task1
type: io.kestra.plugin.scripts.shell.Commands
commands:
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
- sleep 1
- id: task2
type: io.kestra.plugin.scripts.shell.Commands
commands:
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
- sleep 1
Properties
concurrent
- Type: integer
- Dynamic: ❌
- Required: ✔️
- Default:
0
Number of concurrent parallel tasks that can be running at any point in time.
If the value is
0
, no limit exist and all the tasks will start at the same time.
tasks
- Type: array
- SubType: Task
- Dynamic: ❌
- Required: ✔️
- Min items:
1
value
- Type:
- string
- array
- Dynamic: ✔️
- Required: ✔️
The list of values for this task.
The value can be passed as a string, a list of strings, or a list of objects.
errors
- Type: array
- SubType: Task
- Dynamic: ❌
- Required: ❌
List of tasks to run if any tasks failed on this FlowableTask.
Was this page helpful?