Subflows allow you to build modular and reusable workflow components.

They work similarly to calling functions. A subflow execution is created when you call a flow from another flow.

Why use a subflow?

Subflows allow you to build modular and reusable components that you can use across multiple flows. For example, you might have a subflow dedicated to alerting errors to Slack and email. By using a Subflow, you can reuse these two tasks together for all flows that you want to send error notifications, instead of having to copy the individual tasks for every flow.

How to declare a subflow

To call a flow from another flow, use the io.kestra.plugin.core.flow.Subflow task and in that task, specify the flowId and namespace of the subflow that you want to execute. Optionally, you can also specify custom input values, in the same way as you would pass arguments in a function call.

The optional properties wait and transmitFailed control the execution behavior. By default, if wait is not set or set to false, the parent flow continues execution without waiting for the subflow's completion. The transmitFailed property determines whether a failure in the subflow execution should cause the parent flow to fail.

Practical Example

Consider a subflow that encapsulates critical business logic. This subflow can be called from various flows, allowing for code reuse and isolated testing.

Here is a simple example of a subflow:

yaml
id: critical_service
namespace: company.team

tasks:
  - id: return_data
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT sum(total) as total, avg(quantity) as avg_quantity
      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
    store: true

outputs:
  - id: some_output
    type: STRING
    value: "{{ outputs.return_data.uri }}"

In this example, return_data outputs uri of the query output. That URI is a reference to the Internal Storage location of the stored file. This output can be used in the parent flow to perform further processing.

yaml
id: parent_service
namespace: company.team

tasks:
  - id: subflow_call
    type: io.kestra.plugin.core.flow.Subflow
    namespace: company.team
    flowId: critical_service
    wait: true
    transmitFailed: true

  - id: log_subflow_output
    type: io.kestra.plugin.scripts.shell.Commands
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - cat "{{ outputs.subflow_call.outputs.some_output }}"

The outputs map task IDs to their outputs. In this case, we are accessing the outputs.some_output output of the subflow_call task.

Subflow properties

Below is a full list of all properties of the io.kestra.plugin.core.flow.Subflow task. Don't worry, you don't need to memorize them all, you can always open the task documentation to see the full list of Subflow task properties:

FieldDescription
flowIdThe subflow's identifier.
namespaceThe namespace where the subflow is located.
inheritLabelsDetermines if the subflow inherits labels from the parent (default: false).
inputsInputs passed to the subflow.
labelsLabels assigned to the subflow.
outputs (deprecated)Allows passing outputs from the subflow execution to the parent flow.
revisionThe subflow revision to execute (defaults to the latest).
waitIf true, parent flow waits for subflow completion (default: false).
transmitFailedIf true, parent flow fails on subflow failure (requires wait to be true).

Passing data between parent and child flows

Flows can emit outputs that can be accessed by the parent flow. Using the io.kestra.plugin.core.flow.Subflow task you can call any flow as a subflow and access its outputs in downstream tasks. For more details and examples, check the Outputs page.

Accessing Outputs from a subflow execution

Outputs include the execution ID, extracted outputs, and the final state (if wait is true).

Here's an example of a subflow with outputs explictly defined.

yaml
id: flow_outputs
namespace: company.team

tasks:
  - id: mytask
    type: io.kestra.plugin.core.debug.Return
    format: this is a task output used as a final flow output

outputs:
  - id: final
    type: STRING
    value: "{{ outputs.mytask.value }}"

We can access these outputs from a parent task as seen in the example below:

yaml
id: parent_flow
namespace: company.team

tasks:
  - id: subflow
    type: io.kestra.plugin.core.flow.Subflow
    flowId: flow_outputs
    namespace: company.team
    wait: true

  - id: log_subflow_output
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.subflow.outputs.final }}"

More information available here.

Passing inputs to a subflow

You can pass inputs to a Subflow task. The example below passes 2 inputs to a subflow.

Subflow:

yaml
id: subflow_example
namespace: company.team

inputs:
  - id: http_uri
    type: STRING

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Request
    uri: "{{ inputs.http_uri }}"

  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.download.body }}"

outputs:
  - id: data
    type: STRING
    value: "{{ outputs.download.body }}"

Parent flow:

yaml
id: inputs_subflow
namespace: company.team

inputs:
  - id: url
    type: STRING

tasks:
  - id: subflow
    type: io.kestra.plugin.core.flow.Subflow
    flowId: subflow_example
    namespace: company.team
    inputs:
      http_uri: "{{ inputs.url }}"
    wait: true

  - id: hello
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.subflow.outputs.data }}"

We can see that the parent flow was able to pass an input down to the subflow.

Nested Inputs

In the example below, the flow extracts JSON data from a REST API and passes it to a subflow as a nested input:

yaml
id: extract_json
namespace: company.team

tasks:
  - id: api
    type: io.kestra.plugin.core.http.Request
    uri: https://dummyjson.com/users

  - id: read_json
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.api.body }}"

  - id: subflow
    type: io.kestra.plugin.core.flow.Subflow
    namespace: company.team
    flowId: subflow
    inputs:
      users.firstName: "{{ outputs.api.body | jq('.users') | first | first | jq('.firstName') | first }}"
      users.lastName: "{{ outputs.api.body | jq('.users') | first | first | jq('.lastName') | first }}"
    wait: true
    transmitFailed: true

To provide type validation to extracted JSON fields, you can use nested inputs in the subflow definition:

yaml
id: subflow
namespace: company.team

inputs:
  - id: users.firstName
    type: STRING
    defaults: Rick

  - id: users.lastName
    type: STRING
    defaults: Astley

tasks:
  - id: process_user_data
    type: io.kestra.plugin.core.log.Log
    message: hello {{ inputs.users }}

Note how you can then pass the entire users object to any task in the subflow including all nested fields.

Was this page helpful?