Trigger flows from another flow execution.

Flow triggers allows you to trigger a flow after another flow execution, enabling event-driven patterns.

yaml
type: "io.kestra.plugin.core.trigger.Flow"

Kestra is able to trigger a flow as soon as another flow ends. This allows to add implicit dependencies between multiple flows, which can often be managed by different teams.

Check the Flow trigger documentation for the list of all properties.

Preconditions

A flow trigger must have preconditions which filter on other flow executions on a time window.

Filters

  • flows A list of preconditions to met, in the form of upstream flows.

For example, the following flow will trigger as soon as flow_a finish in SUCCESS state.

yaml
id: flow_b
namespace: kestra.sandbox

tasks:
  - id: hello
    type: io.kestra.plugin.core.log.Log
    message: "Hello World!"

triggers:
  - id: upstream_dependancy
    type: io.kestra.plugin.core.trigger.Flow
    preconditions:
      id: flow_trigger
      flows:
        - namespace: kestra.sandbox
          flowId: flow_a
          states: [SUCCESS]
  • where

You can also add execution filters on fields FLOW_ID, NAMESPACE, STATE and EXPRESSION.

For example, the following Flow Trigger will trigger on for execution from flows in FAILED or WARNING state in namespaces starting with "company":

yaml
triggers:
  - id: alert_on_failure
    type: io.kestra.plugin.core.trigger.Flow
    states:
      - FAILED
      - WARNING
    preconditions:
      id: company_namespace
      where:
        - id: company
          filters:
            - field: NAMESPACE
              type: STARTS_WITH
              value: company

Time Window & SLA

You can set the timeWindow property in different ways:

DURATION_WINDOW: this is the default type. It uses a start time (windowAdvance) and end time (window) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined duration window.

For example, with a 1-day window (the default option: window: PT1D), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00:00:00) each day. If you set windowAdvance: PT6H, the window will start at 6 AM each day. If you set windowAdvance: PT6H and you also override the window property to PT6H, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06:00 to 12:00, 12:00 to 18:00, 18:00 to 00:00, and 00:00 to 06:00, and so on.

SLIDING_WINDOW: this option also evaluates SLA conditions over a fixed time window, but it always goes backward from the current time. For example, a sliding window of 1 hour (window: PT1H) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.

For example, the flow below will evaluate every hour if the flow flow_a is in SUCCESS state. If so it will trigger the flow_b passing corresponding inputs (reading flow_a outputs).

yaml
id: flow_b
namespace: kestra.sandbox

inputs:
  - id: value_from_a
    type: STRING

tasks:
  - id: hello
    type: io.kestra.plugin.core.log.Log
    message: "{{ inputs.value_from_a }}"

triggers:
  - id: upstream_dep
    type: io.kestra.plugin.core.trigger.Flow
    inputs:
      value_from_a: "{{ trigger.outputs.return_value }}"
    preconditions:
      id: test
      flows:
        - namespace: kestra.sandbox
          flowId: flow_a
          states: [SUCCESS]
      timeWindow: 
        type: SLIDING_WINDOW
        window: PT1H

Here is for reference the flow_a:

yaml
id: flow_a
namespace: kestra.sandbox

tasks:
  - id: hello
    type: io.kestra.plugin.core.log.Log
    message: Hello World! 🚀

outputs:
  - id: return_value
    type: STRING
    value: "Flow A run succesfully"

DAILY_TIME_DEADLINE: this option declares that some SLA conditions should be met “before a specific time in a day”. With the string property deadline, you can configure a daily cutoff for checking conditions. For example, deadline: "09:00:00.00Z" means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.

For the example, this trigger definition will only trigger the flow if flow_a is in SUCCESS state before 9:00 AM every day.

yaml
triggers:
  - id: upstream_dep
    type: io.kestra.plugin.core.trigger.Flow
    preconditions:
      id: should_be_success_by_nine
      flows:
        - namespace: kestra.sandbox
          flowId: flow_a
          states: [SUCCESS]
      timeWindow: 
        type: DAILY_TIME_DEADLINE
        deadline: "09:00:00.00Z"

DAILY_TIME_WINDOW: this option declares that some SLA conditions should be met “within a given time range in a day”. For example, a window from startTime: "06:00:00" to endTime: "09:00:00" evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.

yaml
triggers:
  - id: upstream_dep
    type: io.kestra.plugin.core.trigger.Flow
    inputs:
      value_from_a: "{{ trigger.outputs.return_value }}"
    preconditions:
      id: test
      flows:
        - namespace: kestra.sandbox
          flowId: flow_a
          states: [SUCCESS]
      timeWindow: 
        type: DAILY_TIME_WINDOW
        startTime: "06:00:00"
        endTime: "12:00:00"

Example

Trigger the silver_layer flow once the bronze_layer flow finishes successfully by 9 AM. The deadline time string must include the timezone offset. This ensures that no new executions are triggered past the deadline. Here is the silver_layer flow:

yaml
id: silver_layer
namespace: company.team
tasks:
  - id: transform_data
    type: io.kestra.plugin.core.log.Log
    message: deduplication, cleaning, and minor aggregations
triggers:
  - id: flow_trigger
    type: io.kestra.plugin.core.trigger.Flow
    preconditions:
      id: bronze_layer
      timeWindow:
        type: DAILY_TIME_DEADLINE
        deadline: "09:00:00+01:00"
      flows:
        - namespace: company.team
          flowId: bronze_layer
          states: [SUCCESS]

Example: Alerting

Create a System Flow to send a Slack alert on any failure or warning state within the company namespace. This example uses the Slack webhook secret to notify the #general channel about the failed flow.

yaml
id: alert
namespace: system
tasks:
  - id: send_alert
    type: io.kestra.plugin.notifications.slack.SlackExecution
    url: "{{secret('SLACK_WEBHOOK')}}" # format: https://hooks.slack.com/services/xzy/xyz/xyz
    channel: "#general"
    executionId: "{{trigger.executionId}}"
triggers:
  - id: alert_on_failure
    type: io.kestra.plugin.core.trigger.Flow
    states:
      - FAILED
      - WARNING
    preconditions:
      id: company_namespace
      where:
        - id: company
          filters:
            - field: NAMESPACE
              type: STARTS_WITH
              value: company

Was this page helpful?