Best Practices for ForEach and ForEachItem
Use ForEach and ForEachItem for different scaling and orchestration patterns.
Choose the right loop primitive
Both tasks iterate over multiple items, but they do it in different ways:
ForEachcreates child task runs inside the same execution.ForEachItemcreates one subflow execution per batch of items.
That design difference affects performance, restart behavior, and how you access outputs.
Decision guide
Use ForEach when:
- You already have a small list in memory, such as an input, a small JSON array, or a small fetched result.
- The work for each item is lightweight.
- You want to share outputs between sibling tasks inside the loop.
- You want a simple loop without introducing a subflow.
Use ForEachItem when:
- You need to process a large dataset or file.
- You want to split data into batches and scale processing through subflows.
- You need better isolation, troubleshooting, and restart behavior for individual batches.
- The data already lives in Kestra internal storage, or can be written there first.
ForEach can generate many task runs in a single execution. For large fan-out or nested loops, prefer ForEachItem or a Subflow-based design to avoid oversized execution contexts and slower orchestration.
ForEachItem expects items to be a Kestra internal storage URI, for example {{ outputs.extract.uri }} or a FILE input. If your source data is a regular JSON array, Excel file, Parquet file, or another non line-oriented format, convert it first.
Subflow vs ForEachItem
Subflow and ForEachItem both create child executions, but they solve different orchestration problems.
Use Subflow when:
- You want to trigger one child flow once.
- You already know the exact inputs to pass to that child flow.
- You want execution isolation without batching or iteration.
- You are decomposing a large workflow into smaller reusable modules.
Use ForEachItem when:
- You want to start many child flow executions from one dataset or file.
- You need batching by
rows,partitions, orbytes. - You want to process file-backed items incrementally at scale.
- You want Kestra to merge outputs from multiple child executions.
Rule of thumb:
Subflowis one child execution for one unit of work.ForEachItemis many child executions for many units of work.
For example, if you need to process one uploaded file in a dedicated child flow, use Subflow. If you need to split that file into many batches and process each batch in its own child flow execution, use ForEachItem.
Understand the main difference
ForEach iterates over a list of values and exposes:
{{ taskrun.value }}for the current value{{ taskrun.iteration }}for the zero-based loop index
ForEachItem iterates over batches of file-backed items and exposes:
{{ taskrun.items }}for the current batch file URI{{ taskrun.iteration }}for the zero-based batch index
In practice:
ForEachis best when the iteration value itself is the thing you want to work with.ForEachItemis best when each iteration should receive a file or batch and hand it off to a subflow.
Best practices for ForEach
- Keep the
valueslist small to moderate in size. - Use
concurrencyLimitdeliberately rather than leaving fan-out unbounded. - If each iteration needs multiple tasks in parallel, put a
Paralleltask inside the loop instead of expecting child tasks to run concurrently by default. - If iterating over JSON objects, remember that
taskrun.valueis a JSON string. UsefromJson(taskrun.value)to access properties. - When referencing outputs from sibling tasks inside the same loop iteration, use
outputs.task_id[taskrun.value].
Example: use sibling outputs correctly inside ForEach
id: foreach_outputsnamespace: company.team
tasks: - id: enrich_regions type: io.kestra.plugin.core.flow.ForEach values: ["north", "south", "west"] concurrencyLimit: 2 tasks: - id: metadata type: io.kestra.plugin.core.output.OutputValues values: region: "{{ taskrun.value }}" bucket: "landing-{{ taskrun.value }}"
- id: build_message type: io.kestra.plugin.core.debug.Return format: "Load {{ outputs.metadata[taskrun.value].values.region }} into {{ outputs.metadata[taskrun.value].values.bucket }}"
- id: log_one_result type: io.kestra.plugin.core.log.Log message: "{{ outputs.build_message['north'].value }}"Why this pattern works:
- Inside the loop,
outputs.metadata[taskrun.value]reads the output from the current iteration. - Outside the loop,
outputs.build_message['north'].valuereads the output for one specific loop value.
Example: iterate over JSON objects safely
id: foreach_jsonnamespace: company.team
tasks: - id: process_users type: io.kestra.plugin.core.flow.ForEach values: - {"id": 101, "email": "a@example.com"} - {"id": 102, "email": "b@example.com"} tasks: - id: log_user type: io.kestra.plugin.core.log.Log message: "User {{ fromJson(taskrun.value).id }} -> {{ fromJson(taskrun.value).email }}"Best practices for ForEachItem
- Store the dataset in internal storage first and pass its URI to
items. - If your source file is CSV, JSON, Excel, or another external format, convert it to ION before passing it to
ForEachItem. - Batch by
rows,partitions, orbytesbased on how the downstream subflow processes data. - Design the subflow so it can be rerun independently for one batch.
- Prefer passing
taskrun.itemsto aFILEinput in the subflow. - If the parent flow must depend on child results, keep
wait: true. - If a child failure should fail the parent task, keep
transmitFailed: true.
Example: process a file in batches with ForEachItem
This pattern is recommended when each batch should run in its own execution.
id: parent_foreachitemnamespace: company.team
tasks: - id: download_orders_csv type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: orders_to_ion type: io.kestra.plugin.serdes.csv.CsvToIon from: "{{ outputs.download_orders_csv.uri }}"
- id: process_batches type: io.kestra.plugin.core.flow.ForEachItem items: "{{ outputs.orders_to_ion.uri }}" batch: rows: 2 namespace: company.team flowId: process_order_batch wait: true transmitFailed: true inputs: orders_file: "{{ taskrun.items }}"
- id: log_merged_outputs_uri type: io.kestra.plugin.core.log.Log message: "{{ outputs.process_batches_merge.subflowOutputs }}"
- id: preview_merged_outputs type: io.kestra.plugin.core.log.Log message: "{{ read(outputs.process_batches_merge.subflowOutputs) }}"And the subflow:
id: process_order_batchnamespace: company.team
inputs: - id: orders_file type: FILE
tasks: - id: inspect_batch type: io.kestra.plugin.core.log.Log message: "{{ read(inputs.orders_file) }}"
outputs: - id: batch_summary type: STRING value: "{{ 'Processed batch content: ' ~ read(inputs.orders_file) }}"Here, orders_file is a batch file generated from the ION output of CsvToIon. Each subflow execution receives one batch file through {{ taskrun.items }}.
Use ForEachItem outputs correctly
ForEachItem is best consumed through its internal helper task outputs:
{{ outputs.task_id_split.splits }}contains the file listing generated batch URIs.{{ outputs.task_id_merge.subflowOutputs }}contains a file with the merged outputs from the child subflows.
If your ForEachItem task id is process_batches, those become:
{{ outputs.process_batches_split.splits }}{{ outputs.process_batches_merge.subflowOutputs }}
This is different from ForEach, where you typically access outputs by loop value, such as outputs.inner['north'].value.
Example: consume merged subflow outputs
If the subflow defines typed flow outputs, ForEachItem merges them into a file exposed by the internal merge task. In the example above, each child execution returns a batch_summary string, and the merge task gathers those subflow outputs into a single file.
id: parent_read_merged_outputsnamespace: company.team
tasks: - id: download_orders_csv type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: orders_to_ion type: io.kestra.plugin.serdes.csv.CsvToIon from: "{{ outputs.download_orders_csv.uri }}"
- id: process_batches type: io.kestra.plugin.core.flow.ForEachItem items: "{{ outputs.orders_to_ion.uri }}" batch: rows: 2 namespace: company.team flowId: process_order_batch wait: true transmitFailed: true inputs: orders_file: "{{ taskrun.items }}"
- id: log_merged_outputs_uri type: io.kestra.plugin.core.log.Log message: "{{ outputs.process_batches_merge.subflowOutputs }}"
- id: preview_merged_outputs type: io.kestra.plugin.core.log.Log message: "{{ read(outputs.process_batches_merge.subflowOutputs) }}"Use {{ outputs.process_batches_merge.subflowOutputs }} when a downstream task needs the collected outputs from all child subflows.
If you want to inspect the merged file content directly, use read(outputs.process_batches_merge.subflowOutputs).
Common mistakes to avoid
- Do not use
ForEachfor very large datasets just because the input started as a JSON array. - Do not pass a non-storage path or raw inline content to
ForEachItem.items; it must be a Kestra internal storage URI. - Do not assume sibling task outputs in
ForEachuse the plainoutputs.task_id.valuesyntax; inside the loop, useoutputs.task_id[taskrun.value]. - Do not expect
ForEachchild tasks to run in parallel unless you either set loop concurrency or add aParalleltask inside the loop. - Do not forget that
taskrun.iterationstarts at0for bothForEachandForEachItem.
Recommended rule of thumb
Use ForEach for orchestration over a relatively small list of values.
Use ForEachItem for data processing over file-backed items or batches, especially when you need scale, restartability, or subflow isolation.
For API details, see the ForEach plugin documentation, the ForEachItem plugin documentation, and the Outputs documentation.
Was this page helpful?