🚀 New! Kestra raises $3 million to grow Learn more

ForEachItem ForEachItem

yaml
type: "io.kestra.core.tasks.flows.ForEachItem"

Execute a subflow for each batch of items

Execute a subflow for each batch of items. The items value must be internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type.

Examples

Execute a subflow for each batch of items. The subflow orders is called from the parent flow orders_parallel using the ForEachItem task in order to start one subflow execution for each batch of items.

yaml
id: orders
namespace: prod

inputs:
  - name: order
    type: STRING

tasks:
  - id: read_file
    type: io.kestra.plugin.scripts.shell.Commands
    runner: PROCESS
    commands:
      - cat "{{ inputs.order }}"

  - id: read_file_content
    type: io.kestra.core.tasks.log.Log
    message: "{{ read(inputs.order) }}"
yaml
id: orders_parallel
namespace: prod

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://raw.githubusercontent.com/kestra-io/datasets/main/csv/orders.csv', header=True);
    store: true

  - id: each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.extract.uri }}"
    batch:
      rows: 1
    namespace: prod
    flowId: orders
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    inputs:
      order: "{{ taskrun.items }}" # special variable that contains the items of the batch

Properties

batch

  • Type: Batch
  • Dynamic:
  • Required: ✔️
  • Default: {rows=1, separator= }

How to split the items into batches.

flowId

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The identifier of the subflow to be executed

items

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as null, or a FILE type input parameter, like null. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the io.kestra.plugin.serdes module, which will transform files from their original format to ION.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The namespace of the subflow to be executed

inheritLabels

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: false

Whether the subflow should inherit labels from this execution that triggered it.

By default, labels are not passed to the subflow execution. If you set this option to true, the child flow execution will inherit all labels from the parent execution.

inputs

  • Type: object
  • Dynamic: ✔️
  • Required:

The inputs to pass to the subflow to be executed

labels

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

The labels to pass to the subflow to be executed

revision

  • Type: integer
  • Dynamic:
  • Required:

The revision of the subflow to be executed

By default, the last, i.e. the most recent, revision of the subflow is executed.

transmitFailed

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: true

Whether to fail the current execution if the subflow execution fails or is killed.

Note that this option works only if wait is set to true.

wait

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: true

Whether to wait for the subflows execution to finish before continuing the current execution.

Outputs

iterations

  • Type: object
  • SubType: integer

The counter of iterations for each subflow execution state.

This output will be updated in real-time based on the state of subflow executions. It will contain one counter per subflow execution state, as well as a max counter that represents the maximum number of iterations (i.e. the total number of batches).

Definitions

Batch

bytes

  • Type: string
  • Dynamic: ✔️
  • Required:

Split a large file into multiple chunks with a maximum file size of bytes.

Can be provided as a string in the format "10MB" or "200KB", or the number of bytes. This allows you to process large files, slit them into smaller chunks by lines and process them in parallel. For example, MySQL by default limits the size of a query size to 16MB per query. Trying to use a bulk insert query with input data larger than 16MB will fail. Splitting the input data into smaller chunks is a common strategy to circumvent this limitation. By dividing a large data set into chunks smaller than the max_allowed_packet size (e.g., 10MB), you can insert the data in multiple smaller queries. This approach not only helps to avoid hitting the query size limit but can also be more efficient and manageable in terms of memory utilization, especially for very large datasets. In short, by splitting the file by bytes, you can bulk-insert smaller chunks of e.g. 10MB in parallel to avoid this limitation.

partitions

  • Type: integer
  • Dynamic: ✔️
  • Required:

Split a file into a fixed number of partitioned files. For example, if you have a file with 1000 lines and you set partitions to 10, the file will be split into 10 files with 100 lines each.

rows

  • Type: integer
  • Dynamic: ✔️
  • Required:
  • Default: 1

A number of rows per batch. The file will then be split into chunks with that maximum number of rows.

separator

  • Type: string
  • Dynamic:
  • Required:
  • Default:

The separator used to split a file into chunks. By default, it's a newline \n character. If you are on Windows, you might want to use \r\n instead.