ForEachItem
ForEachItem
yaml
type: "io.kestra.plugin.core.flow.ForEachItem"Examples
yaml
id: orders_parallel
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT *
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.extract.uri }}"
batch:
rows: 1
namespace: company.team
flowId: orders
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
inputs:
order: "{{ taskrun.items }}" # special variable that contains the items of the batch
yaml
id: iterate_over_json
namespace: company.team
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: "https://api.restful-api.dev/objects"
contentType: application/json
method: GET
failOnEmptyResponse: true
timeout: PT15S
- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false # regular json
- id: ion_to_jsonl
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.json_to_ion.uri }}"
newLine: true # JSON-L
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.ion_to_jsonl.uri }}"
batch:
rows: 1
namespace: company.team
flowId: mysubflow
wait: true
transmitFailed: true
inputs:
json: "{{ json(read(taskrun.items)) }}"
yaml
id: process_files
namespace: company.team
tasks:
- id: loop_over_files
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.objects | jq('.[].uri') }}"
tasks:
- id: subflow_per_batch
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ trigger.uris[parent.taskrun.value] }}"
batch:
rows: 1
flowId: process_batch
namespace: company.team
wait: true
transmitFailed: true
inputs:
data: "{{ taskrun.items }}"
triggers:
- id: s3
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT1S"
accessKeyId: "<access-key>"
secretKeyId: "<secret-key>"
region: "us-east-1"
bucket: "my_bucket"
prefix: "sub-dir"
action: NONE
Properties
flowId*Requiredstring
Min length
1items*Requiredstring
Min length
1namespace*Requiredstring
Min length
1batchNon-dynamic
Default
{
"rows": "1",
"separator": "\n"
} Definitions
io.kestra.plugin.core.flow.ForEachItem-Batch
bytesstring
partitionsintegerstring
regexPatternstring
rowsintegerstring
Default
1separatorstring
Default
\nerrorsNon-dynamicarray
finallyNon-dynamicarray
inheritLabelsNon-dynamicboolean
Default
falseinputsobject
labelsarrayobject
restartBehaviorNon-dynamicstring
Default
RETRY_FAILEDPossible Values
NEW_EXECUTIONRETRY_FAILEDrevisionNon-dynamicinteger
scheduleDatestring
Format
date-timetransmitFailedNon-dynamicboolean
Default
truewaitNon-dynamicboolean
Default
true