EachParallel EachParallel

yaml
type: "io.kestra.plugin.core.flow.EachParallel"

For each value in the list, execute one or more tasks in parallel.

The list of tasks will be executed for each item in parallel. The value must be a valid JSON string representing an array, e.g. a list of strings ["value1", "value2"] or a list of dictionaries [{"key": "value1"}, {"key": "value2"}]. You can access the current iteration value using the variable {{ taskrun.value }}.

The task list will be executed in parallel for each item. For example, if you have a list with 3 elements and 2 tasks defined in the list of tasks, all 6 tasks will be computed in parallel without any order guarantee.

If you want to execute a group of sequential tasks for each value in parallel, you can wrap the list of tasks with the Sequential task. If your list of values is large, you can limit the number of concurrent tasks using the concurrent property.

We highly recommend triggering a subflow for each value (e.g. using the ForEachItem task) instead of specifying many tasks wrapped in a Sequential task. This allows better scalability and modularity. Check the flow best practices documentation for more details.

Examples

yaml
id: each-parallel
namespace: company.team

tasks:
  - id: each-parallel
    type: io.kestra.plugin.core.flow.EachParallel
    value: '["value 1", "value 2", "value 3"]'
    tasks:
      - id: each-value
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} with current value '{{ taskrun.value }}'"

Create a file for each value in parallel, then process all files in the next task. Note how the inputFiles property uses a jq expression with a map function to extract the paths of all files processed in parallel and pass them into the next task's working directory.

yaml
id: parallel_script
namespace: company.team

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ range(1, 9) }}"
    tasks:
      - id: script
        type: io.kestra.plugin.scripts.shell.Script
        outputFiles:
          - "out/*.txt"
        script: |
          mkdir out
          echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt

  - id: process_all_files
    type: io.kestra.plugin.scripts.shell.Script
    inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
    script: |
      ls -h out/

Run a group of tasks for each value in parallel.

yaml
id: parallel_task_groups
namespace: company.team

tasks:
  - id: for_each
    type: io.kestra.plugin.core.flow.EachParallel
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: group
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: task1
            type: io.kestra.plugin.scripts.shell.Commands
            commands:
              - echo "{{task.id}} > {{ parents[0].taskrun.value }}"
              - sleep 1

          - id: task2
            type: io.kestra.plugin.scripts.shell.Commands
            commands:
              - echo "{{task.id}} > {{ parents[0].taskrun.value }}"
              - sleep 1

Properties

concurrent

  • Type: integer
  • Dynamic:
  • Required: ✔️
  • Default: 0

Number of concurrent parallel tasks that can be running at any point in time.

If the value is 0, no limit exist and all the tasks will start at the same time.

tasks

  • Type: array
  • SubType: Task
  • Dynamic:
  • Required: ✔️
  • Min items: 1

value

  • Type:
    • string
    • array
  • Dynamic: ✔️
  • Required: ✔️

The list of values for this task.

The value can be passed as a string, a list of strings, or a list of objects.

errors

  • Type: array
  • SubType: Task
  • Dynamic:
  • Required:

List of tasks to run if any tasks failed on this FlowableTask.

Outputs

Definitions

Was this page helpful?