🚀 New! Kestra raises $3 million to grow Learn more

You can use triggers to automatically start flows based on events. A trigger can be a scheduled date, a new file arrival, a new message in a queue, or the end of another flow's execution.

Defining triggers

Use the triggers keyword in the flow and deifne a list of triggers — you can have several triggers attached to a flow.

The trigger definition looks similar to the task definition — it contains an id, a type, and additional properties related to the specific trigger type.

The workflow below will be automatically triggered every day at 10 AM, as well as anytime when the first_flow finishes its execution. Both triggers are independent of each other.

yaml
id: getting_started
namespace: dev
tasks:
  - id: hello_world
    type: io.kestra.core.tasks.log.Log
    message: Hello World!

triggers:
  - id: schedule_trigger
    type: io.kestra.core.models.triggers.types.Schedule
    cron: 0 10 * * *

  - id: flow_trigger
    type: io.kestra.core.models.triggers.types.Flow
    conditions:
      - type: io.kestra.core.models.conditions.types.ExecutionFlowCondition
        namespace: dev
        flowId: first_flow

Add a trigger to your flow

Let's look at another trigger example. This trigger will start our flow every Monday at 10 AM.

yaml
triggers:
  - id: every_monday_at_10_am
    type: io.kestra.core.models.triggers.types.Schedule
    cron: 0 10 * * 1
Click here to see the full workflow example with this Schedule trigger
yaml
id: getting_started
namespace: dev

labels:
  owner: engineering

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: https://dummyjson.com/products

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")

  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

triggers:
  - id: every_monday_at_10_am
    type: io.kestra.core.models.triggers.types.Schedule
    cron: 0 10 * * 1