Subflows
Subflows allow you to build modular and reusable workflow components.
They work similarly to calling functions. A subflow execution is created when you call a flow from another flow.
How to declare a subflow
To call a flow from another flow, use the io.kestra.core.tasks.flows.Subflow
task and in that task, specify the flowId
and namespace
of the subflow that you want to execute. Optionally, you can also specify custom input
values, in the same way as you would pass arguments in a function call.
The optional properties wait
and transmitFailed
control the execution behavior. By default, if wait
is not set or set to false, the parent flow continues execution without waiting for the subflow's completion. The transmitFailed
property determines whether a failure in the subflow execution should cause the parent flow to fail.
Practical Example
Consider a subflow that encapsulates critical business logic. This subflow can be called from various flows, allowing for code reuse and isolated testing.
Here is a simple example of a subflow:
id: critical_service
namespace: example
tasks:
- id: return_data
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
In this example, return_data
outputs uri
of the query output. That URI is a reference to the Internal Storage location of the stored file. This output can be used in the parent flow to perform further processing.
id: parent_service
namespace: example
tasks:
- id: subflow_call
type: io.kestra.core.tasks.flows.Subflow
namespace: example
flowId: critical_service
wait: true
transmitFailed: true
# output data from a specific subflow task called return_data
outputs:
some_output: "{{outputs.return_data.uri}}"
- id: log_subflow_output
type: io.kestra.plugin.scripts.shell.Commands
runner: PROCESS
commands:
- cat "{{ outputs.subflow_call.outputs.some_output }}"
The outputs
map task IDs to their outputs. In this case, we are accessing the outputs.some_output
output of the subflow_call
task.
Subflow properties
Below is a full list of all properties of the io.kestra.core.tasks.flows.Subflow
task. Don't worry, you don't need to memorize them all, you can always open the task documentation to see the full list of Subflow task properties:
Field | Description |
---|---|
flowId | The subflow's identifier. |
namespace | The namespace where the subflow is located. |
inheritLabels | Determines if the subflow inherits labels from the parent (default: false). |
inputs | Inputs passed to the subflow. |
labels | Labels assigned to the subflow. |
outputs (deprecated) | Allows passing outputs from the subflow execution to the parent flow. |
revision | The subflow revision to execute (defaults to the latest). |
wait | If true, parent flow waits for subflow completion (default: false). |
transmitFailed | If true, parent flow fails on subflow failure (requires wait to be true). |
Passing data between parent and child flows
Flows can emit outputs that can be accessed by the parent flow. Using the io.kestra.core.tasks.flows.Subflow
task you can call any flow as a subflow and access its outputs in downstream tasks. For more details and examples, check the Outputs page.
Outputs from a subflow execution
Outputs include the execution ID, extracted outputs, and the final state (if wait
is true).
To sum up, subflows improve maintainability of complex workflows. They allow you to build modular and reusable workflow components and share them across multiple namespaces, projects, and teams.
More information available here
Passing nested inputs to a subflow
Consider the following flow that extracts JSON data from a REST API and passes it to a subflow:
id: extract_json
namespace: example
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: https://dummyjson.com/users
- id: read_json
type: io.kestra.core.tasks.log.Log
message: "{{ outputs.api.body }}"
- id: subflow
type: io.kestra.core.tasks.flows.Subflow
namespace: example
flowId: subflow
inputs:
users.firstName: "{{ outputs.api.body | jq('.users') | first | first | jq('.firstName') | first }}"
users.lastName: "{{ outputs.api.body | jq('.users') | first | first | jq('.lastName') | first }}"
wait: true
transmitFailed: true
To provide type validation to extracted JSON fields, you can use nested inputs in the subflow definition:
id: subflow
namespace: example
inputs:
- id: users.firstName
type: STRING
defaults: Rick
- id: users.lastName
type: STRING
defaults: Astley
tasks:
- id: process_user_data
type: io.kestra.core.tasks.log.Log
message: hello {{ inputs.users }}
Note how you can then pass the entire users
object to any task in the subflow including all nested fields.