Best Practices for ForEach and ForEachItem

Use ForEach and ForEachItem for different scaling and orchestration patterns.

Choose the right loop primitive

Both tasks iterate over multiple items, but they do it in different ways:

  • ForEach creates child task runs inside the same execution.
  • ForEachItem creates one subflow execution per batch of items.

That design difference affects performance, restart behavior, and how you access outputs.

Decision guide

Use ForEach when:

  • You already have a small list in memory, such as an input, a small JSON array, or a small fetched result.
  • The work for each item is lightweight.
  • You want to share outputs between sibling tasks inside the loop.
  • You want a simple loop without introducing a subflow.

Use ForEachItem when:

  • You need to process a large dataset or file.
  • You want to split data into batches and scale processing through subflows.
  • You need better isolation, troubleshooting, and restart behavior for individual batches.
  • The data already lives in Kestra internal storage, or can be written there first.

Subflow vs ForEachItem

Subflow and ForEachItem both create child executions, but they solve different orchestration problems.

Use Subflow when:

  • You want to trigger one child flow once.
  • You already know the exact inputs to pass to that child flow.
  • You want execution isolation without batching or iteration.
  • You are decomposing a large workflow into smaller reusable modules.

Use ForEachItem when:

  • You want to start many child flow executions from one dataset or file.
  • You need batching by rows, partitions, or bytes.
  • You want to process file-backed items incrementally at scale.
  • You want Kestra to merge outputs from multiple child executions.

Rule of thumb:

  • Subflow is one child execution for one unit of work.
  • ForEachItem is many child executions for many units of work.

For example, if you need to process one uploaded file in a dedicated child flow, use Subflow. If you need to split that file into many batches and process each batch in its own child flow execution, use ForEachItem.

Understand the main difference

ForEach iterates over a list of values and exposes:

  • {{ taskrun.value }} for the current value
  • {{ taskrun.iteration }} for the zero-based loop index

ForEachItem iterates over batches of file-backed items and exposes:

  • {{ taskrun.items }} for the current batch file URI
  • {{ taskrun.iteration }} for the zero-based batch index

In practice:

  • ForEach is best when the iteration value itself is the thing you want to work with.
  • ForEachItem is best when each iteration should receive a file or batch and hand it off to a subflow.

Best practices for ForEach

  • Keep the values list small to moderate in size.
  • Use concurrencyLimit deliberately rather than leaving fan-out unbounded.
  • If each iteration needs multiple tasks in parallel, put a Parallel task inside the loop instead of expecting child tasks to run concurrently by default.
  • If iterating over JSON objects, remember that taskrun.value is a JSON string. Use fromJson(taskrun.value) to access properties.
  • When referencing outputs from sibling tasks inside the same loop iteration, use outputs.task_id[taskrun.value].

Example: use sibling outputs correctly inside ForEach

id: foreach_outputs
namespace: company.team
tasks:
- id: enrich_regions
type: io.kestra.plugin.core.flow.ForEach
values: ["north", "south", "west"]
concurrencyLimit: 2
tasks:
- id: metadata
type: io.kestra.plugin.core.output.OutputValues
values:
region: "{{ taskrun.value }}"
bucket: "landing-{{ taskrun.value }}"
- id: build_message
type: io.kestra.plugin.core.debug.Return
format: "Load {{ outputs.metadata[taskrun.value].values.region }} into {{ outputs.metadata[taskrun.value].values.bucket }}"
- id: log_one_result
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.build_message['north'].value }}"

Why this pattern works:

  • Inside the loop, outputs.metadata[taskrun.value] reads the output from the current iteration.
  • Outside the loop, outputs.build_message['north'].value reads the output for one specific loop value.

Example: iterate over JSON objects safely

id: foreach_json
namespace: company.team
tasks:
- id: process_users
type: io.kestra.plugin.core.flow.ForEach
values:
- {"id": 101, "email": "a@example.com"}
- {"id": 102, "email": "b@example.com"}
tasks:
- id: log_user
type: io.kestra.plugin.core.log.Log
message: "User {{ fromJson(taskrun.value).id }} -> {{ fromJson(taskrun.value).email }}"

Best practices for ForEachItem

  • Store the dataset in internal storage first and pass its URI to items.
  • If your source file is CSV, JSON, Excel, or another external format, convert it to ION before passing it to ForEachItem.
  • Batch by rows, partitions, or bytes based on how the downstream subflow processes data.
  • Design the subflow so it can be rerun independently for one batch.
  • Prefer passing taskrun.items to a FILE input in the subflow.
  • If the parent flow must depend on child results, keep wait: true.
  • If a child failure should fail the parent task, keep transmitFailed: true.

Example: process a file in batches with ForEachItem

This pattern is recommended when each batch should run in its own execution.

id: parent_foreachitem
namespace: company.team
tasks:
- id: download_orders_csv
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: orders_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ outputs.download_orders_csv.uri }}"
- id: process_batches
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.orders_to_ion.uri }}"
batch:
rows: 2
namespace: company.team
flowId: process_order_batch
wait: true
transmitFailed: true
inputs:
orders_file: "{{ taskrun.items }}"
- id: log_merged_outputs_uri
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.process_batches_merge.subflowOutputs }}"
- id: preview_merged_outputs
type: io.kestra.plugin.core.log.Log
message: "{{ read(outputs.process_batches_merge.subflowOutputs) }}"

And the subflow:

id: process_order_batch
namespace: company.team
inputs:
- id: orders_file
type: FILE
tasks:
- id: inspect_batch
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.orders_file) }}"
outputs:
- id: batch_summary
type: STRING
value: "{{ 'Processed batch content: ' ~ read(inputs.orders_file) }}"

Here, orders_file is a batch file generated from the ION output of CsvToIon. Each subflow execution receives one batch file through {{ taskrun.items }}.

Use ForEachItem outputs correctly

ForEachItem is best consumed through its internal helper task outputs:

  • {{ outputs.task_id_split.splits }} contains the file listing generated batch URIs.
  • {{ outputs.task_id_merge.subflowOutputs }} contains a file with the merged outputs from the child subflows.

If your ForEachItem task id is process_batches, those become:

  • {{ outputs.process_batches_split.splits }}
  • {{ outputs.process_batches_merge.subflowOutputs }}

This is different from ForEach, where you typically access outputs by loop value, such as outputs.inner['north'].value.

Example: consume merged subflow outputs

If the subflow defines typed flow outputs, ForEachItem merges them into a file exposed by the internal merge task. In the example above, each child execution returns a batch_summary string, and the merge task gathers those subflow outputs into a single file.

id: parent_read_merged_outputs
namespace: company.team
tasks:
- id: download_orders_csv
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: orders_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ outputs.download_orders_csv.uri }}"
- id: process_batches
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.orders_to_ion.uri }}"
batch:
rows: 2
namespace: company.team
flowId: process_order_batch
wait: true
transmitFailed: true
inputs:
orders_file: "{{ taskrun.items }}"
- id: log_merged_outputs_uri
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.process_batches_merge.subflowOutputs }}"
- id: preview_merged_outputs
type: io.kestra.plugin.core.log.Log
message: "{{ read(outputs.process_batches_merge.subflowOutputs) }}"

Use {{ outputs.process_batches_merge.subflowOutputs }} when a downstream task needs the collected outputs from all child subflows. If you want to inspect the merged file content directly, use read(outputs.process_batches_merge.subflowOutputs).

Common mistakes to avoid

  • Do not use ForEach for very large datasets just because the input started as a JSON array.
  • Do not pass a non-storage path or raw inline content to ForEachItem.items; it must be a Kestra internal storage URI.
  • Do not assume sibling task outputs in ForEach use the plain outputs.task_id.value syntax; inside the loop, use outputs.task_id[taskrun.value].
  • Do not expect ForEach child tasks to run in parallel unless you either set loop concurrency or add a Parallel task inside the loop.
  • Do not forget that taskrun.iteration starts at 0 for both ForEach and ForEachItem.

Use ForEach for orchestration over a relatively small list of values.

Use ForEachItem for data processing over file-backed items or batches, especially when you need scale, restartability, or subflow isolation.

For API details, see the ForEach plugin documentation, the ForEachItem plugin documentation, and the Outputs documentation.

Was this page helpful?