Flow Trigger
Trigger flows from another flow execution.
Flow triggers allows you to trigger a flow after another flow execution, enabling event-driven patterns.
type: "io.kestra.plugin.core.trigger.Flow"
Kestra is able to trigger a flow as soon as another flow ends. This allows to add implicit dependencies between multiple flows, which can often be managed by different teams.
Check the Flow trigger documentation for the list of all properties.
Preconditions
A flow trigger must have preconditions which filter on other flow executions on a time window.
Filters
flows
A list of preconditions to met, in the form of upstream flows.
For example, the following flow will trigger as soon as flow_a
finish in SUCCESS state.
id: flow_b
namespace: kestra.sandbox
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "Hello World!"
triggers:
- id: upstream_dependancy
type: io.kestra.plugin.core.trigger.Flow
preconditions:
id: flow_trigger
flows:
- namespace: kestra.sandbox
flowId: flow_a
states: [SUCCESS]
where
You can also add execution filters on fields FLOW_ID
, NAMESPACE
, STATE
and EXPRESSION
.
For example, the following Flow Trigger will trigger on for execution from flows in FAILED or WARNING state in namespaces starting with "company":
triggers:
- id: alert_on_failure
type: io.kestra.plugin.core.trigger.Flow
states:
- FAILED
- WARNING
preconditions:
id: company_namespace
where:
- id: company
filters:
- field: NAMESPACE
type: STARTS_WITH
value: company
Time Window & SLA
You can set the timeWindow
property in different ways:
DURATION_WINDOW
: this is the default type. It uses a start time (windowAdvance) and end time (window) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined duration window.
For example, with a 1-day window (the default option: window: PT1D), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00:00:00) each day. If you set windowAdvance: PT6H, the window will start at 6 AM each day. If you set windowAdvance: PT6H and you also override the window property to PT6H, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06:00 to 12:00, 12:00 to 18:00, 18:00 to 00:00, and 00:00 to 06:00, and so on.
SLIDING_WINDOW
: this option also evaluates SLA conditions over a fixed time window, but it always goes backward from the current time. For example, a sliding window of 1 hour (window: PT1H) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.
For example, the flow below will evaluate every hour if the flow flow_a
is in SUCCESS state. If so it will trigger the flow_b
passing corresponding inputs (reading flow_a
outputs).
id: flow_b
namespace: kestra.sandbox
inputs:
- id: value_from_a
type: STRING
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.value_from_a }}"
triggers:
- id: upstream_dep
type: io.kestra.plugin.core.trigger.Flow
inputs:
value_from_a: "{{ trigger.outputs.return_value }}"
preconditions:
id: test
flows:
- namespace: kestra.sandbox
flowId: flow_a
states: [SUCCESS]
timeWindow:
type: SLIDING_WINDOW
window: PT1H
Here is for reference the flow_a
:
id: flow_a
namespace: kestra.sandbox
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
outputs:
- id: return_value
type: STRING
value: "Flow A run succesfully"
DAILY_TIME_DEADLINE
: this option declares that some SLA conditions should be met “before a specific time in a day”. With the string property deadline, you can configure a daily cutoff for checking conditions. For example, deadline: "09:00:00.00Z" means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.
For the example, this trigger definition will only trigger the flow if flow_a
is in SUCCESS state before 9:00 AM every day.
triggers:
- id: upstream_dep
type: io.kestra.plugin.core.trigger.Flow
preconditions:
id: should_be_success_by_nine
flows:
- namespace: kestra.sandbox
flowId: flow_a
states: [SUCCESS]
timeWindow:
type: DAILY_TIME_DEADLINE
deadline: "09:00:00.00Z"
DAILY_TIME_WINDOW
: this option declares that some SLA conditions should be met “within a given time range in a day”. For example, a window from startTime: "06:00:00" to endTime: "09:00:00" evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
triggers:
- id: upstream_dep
type: io.kestra.plugin.core.trigger.Flow
inputs:
value_from_a: "{{ trigger.outputs.return_value }}"
preconditions:
id: test
flows:
- namespace: kestra.sandbox
flowId: flow_a
states: [SUCCESS]
timeWindow:
type: DAILY_TIME_WINDOW
startTime: "06:00:00"
endTime: "12:00:00"
Example
Trigger the silver_layer
flow once the bronze_layer
flow finishes successfully by 9 AM. The deadline time string must include the timezone offset. This ensures that no new executions are triggered past the deadline. Here is the silver_layer
flow:
id: silver_layer
namespace: company.team
tasks:
- id: transform_data
type: io.kestra.plugin.core.log.Log
message: deduplication, cleaning, and minor aggregations
triggers:
- id: flow_trigger
type: io.kestra.plugin.core.trigger.Flow
preconditions:
id: bronze_layer
timeWindow:
type: DAILY_TIME_DEADLINE
deadline: "09:00:00+01:00"
flows:
- namespace: company.team
flowId: bronze_layer
states: [SUCCESS]
Example: Alerting
Create a System Flow
to send a Slack alert on any failure or warning state within the company
namespace. This example uses the Slack webhook secret to notify the #general
channel about the failed flow.
id: alert
namespace: system
tasks:
- id: send_alert
type: io.kestra.plugin.notifications.slack.SlackExecution
url: "{{secret('SLACK_WEBHOOK')}}" # format: https://hooks.slack.com/services/xzy/xyz/xyz
channel: "#general"
executionId: "{{trigger.executionId}}"
triggers:
- id: alert_on_failure
type: io.kestra.plugin.core.trigger.Flow
states:
- FAILED
- WARNING
preconditions:
id: company_namespace
where:
- id: company
filters:
- field: NAMESPACE
type: STARTS_WITH
value: company
Was this page helpful?