Trigger
Certified
Trigger a flow on a change data capture event in MongoDB and create new execution per batch.
Trigger
Certified
Trigger a flow on a change data capture event in MongoDB and create new execution per batch.
yaml
type: io.kestra.plugin.debezium.mongodb.TriggerExamples
yaml
id: debezium_mongodb
namespace: company.team
tasks:
- id: send_data
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.uris }}"
triggers:
- id: trigger
type: io.kestra.plugin.debezium.mongodb.Trigger
snapshotMode: INITIAL
connectionString: "mongodb://mongo_user:{{secret('MONGO_PASSWORD')}}@mongos0.example.com:27017,mongos1.example.com:27017/"
yaml
id: debezium_mongodb
namespace: company.team
tasks:
- id: send_data
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.uris }}"
triggers:
- id: trigger
type: io.kestra.plugin.debezium.mongodb.Trigger
snapshotMode: INITIAL
connectionString: "mongodb://mongo_user:{{secret('MONGO_PASSWORD')}}@mongodb0.example.com:27017/?replicaSet=rs0"
Properties
connectionString *Requiredstring
allowConcurrent Non-dynamicboolean
Default
falseconditions Non-dynamic
Definitions
Allow events only between two datetimes.
Example
yaml
id: schedule_condition_datetimebetween
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute once every 5 minutes after the date 2025-12-31T23:59:59Z"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/5 * * * *"
conditions:
- type: io.kestra.plugin.core.condition.DateTimeBetween
date: "{{ trigger.date }}"
after: "2025-12-31T23:59:59Z"
yaml
id: schedule_condition_datetimebetween
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will be executed once every 5 minutes between the before and after dates"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/5 * * * *"
conditions:
- type: io.kestra.plugin.core.condition.DateTimeBetween
date: "{{ trigger.date }}"
after: "2025-01-01T00:00:00Z"
before: "2025-12-31T23:59:59Z"
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.DateTimeBetweenio.kestra.core.models.conditions.types.DateTimeBetweenConditionio.kestra.plugin.core.condition.DateTimeBetweenConditionafterstring
beforestring
datestring
Default
{{ trigger.date }}Allow events on a specific weekday.
Example
yaml
id: schedule_condition_dayweek
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute only on Mondays at 11:00 am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.DayWeek
dayOfWeek: "MONDAY"
dayOfWeek*Requiredstring
Possible Values
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAYtype*Requiredobject
Possible Values
io.kestra.plugin.core.condition.DayWeekio.kestra.core.models.conditions.types.DayWeekConditionio.kestra.plugin.core.condition.DayWeekConditiondatestring
Default
{{ trigger.date }}Allow events on an nth weekday within the month.
Example
yaml
id: schedule_condition_dayweekinmonth
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute only on the first Monday of the month at 11:00 am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.DayWeekInMonth
dayOfWeek: "MONDAY"
dayInMonth: FIRST
dayInMonth*Requiredstring
Possible Values
FIRSTLASTSECONDTHIRDFOURTHdayOfWeek*Requiredstring
Possible Values
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAYtype*Requiredobject
Possible Values
io.kestra.plugin.core.condition.DayWeekInMonthio.kestra.plugin.core.condition.DayWeekInMonthConditionio.kestra.core.models.conditions.types.DayWeekInMonthConditiondatestring
Default
{{ trigger.date }}Match events from a specific flow.
Example
yaml
id: flow_condition_executionflow
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when flow `flow_a` of namespace `company.team` enters RUNNING state."
triggers:
- id: flow_trigger
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlow
flowId: flow_a
namespace: company.team
states:
- RUNNING
flowId*Requiredstring
namespace*Requiredstring
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.ExecutionFlowio.kestra.plugin.core.condition.ExecutionFlowConditionio.kestra.core.models.conditions.types.ExecutionFlowConditionMatch executions by label set.
Example
yaml
id: flow_condition_executionlabels
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when flow with specified labels enters FAILED state."
triggers:
- id: flow_trigger
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionLabels
labels:
owner: john.doe
env: prod
states:
- FAILED
labels*Requiredarrayobject
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.ExecutionLabelsio.kestra.core.models.conditions.types.ExecutionLabelsConditionio.kestra.plugin.core.condition.ExecutionLabelsConditionMatch executions by namespace.
Example
yaml
id: flow_condition_executionnamespace
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when any flow within `company.engineering` namespace enters RUNNING state."
triggers:
- id: flow_trigger
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionNamespace
namespace: company.engineering
comparison: PREFIX
states:
- RUNNING
namespace*Requiredstring
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.ExecutionNamespaceio.kestra.core.models.conditions.types.ExecutionNamespaceConditionio.kestra.plugin.core.condition.ExecutionNamespaceConditioncomparisonstring
Possible Values
EQUALSPREFIXSUFFIXprefixbooleanstring
Default
falseCondition based on the outputs of an upstream execution.
Example
yaml
id: flow_condition_executionoutputs
namespace: company.team
tasks:
- id: upstream_outputs
type: io.kestra.plugin.core.log.Log
message: hello from a downstream flow
triggers:
- id: condition_on_flow_execution_outputs
type: io.kestra.plugin.core.trigger.Flow
states:
- SUCCESS
conditions:
- type: io.kestra.plugin.core.condition.ExecutionOutputs
expression: "{{ trigger.outputs.flow_a_output == 'hello' }}"
expression*Requiredbooleanstring
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.ExecutionOutputsio.kestra.core.models.conditions.types.ExecutionOutputsConditionio.kestra.plugin.core.condition.ExecutionOutputsConditionMatch executions by status.
Example
yaml
id: flow_condition_executionstatus
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when any flow enters FAILED or KILLED state."
triggers:
- id: flow_trigger
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionStatus
in:
- FAILED
- KILLED
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.ExecutionStatusio.kestra.core.models.conditions.types.ExecutionStatusConditionio.kestra.plugin.core.condition.ExecutionStatusConditioninarray
SubTypestring
Possible Values
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDnotInarray
SubTypestring
Possible Values
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDCondition based on variable expression.
Example
yaml
id: myflow
namespace: company.team
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Average value has gone below 10
triggers:
- id: expression_trigger
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/1 * * * *"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ kv('average_value') < 10 }}"
expression*Requiredstring
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.Expressionio.kestra.plugin.core.condition.ExpressionConditionio.kestra.core.models.conditions.types.VariableConditionMatch executions where a task was retried.
Example
yaml
id: flow_condition_hasretryattempt
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when any flow task on retry enters a specific state(s)."
triggers:
- id: flow_condition
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.HasRetryAttempt
in:
- FAILED
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.HasRetryAttemptio.kestra.plugin.core.condition.HasRetryAttemptConditionio.kestra.core.models.conditions.types.HasRetryAttemptConditioninarray
SubTypestring
Possible Values
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDnotInarray
SubTypestring
Possible Values
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDInvert one or more conditions.
Example
yaml
id: schedule_condition_not
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute on all days except Sunday at 11am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.Not
conditions:
- type: io.kestra.plugin.core.condition.DayWeek
dayOfWeek: "SUNDAY"
conditions*Required
Min items
1type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.Notio.kestra.core.models.conditions.types.NotConditionio.kestra.plugin.core.condition.NotConditionPass when any condition is true.
Example
yaml
id: schedule_condition_or
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute on Sundays and Mondays at 11am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.Or
conditions:
- type: io.kestra.plugin.core.condition.DayWeek
dayOfWeek: "MONDAY"
- type: io.kestra.plugin.core.condition.DayWeek
dayOfWeek: "SUNDAY"
conditions*Required
Min items
1type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.Orio.kestra.core.models.conditions.types.OrConditionio.kestra.plugin.core.condition.OrConditionAllow events on public holidays.
Example
yaml
id: schedule_condition_public-holiday
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute only on the public holidays of France at 11:00 am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.PublicHoliday
country: FR
yaml
id: schedule-condition-work-days
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute only on the work days of France at 11:00 am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.Not
conditions:
- type: io.kestra.plugin.core.condition.PublicHoliday
country: FR
- type: io.kestra.plugin.core.condition.Weekend
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.PublicHolidayio.kestra.plugin.core.condition.PublicHolidayConditionio.kestra.core.models.conditions.types.PublicHolidayConditioncountrystring
datestring
Default
{{ trigger.date}}subDivisionstring
Allow events between two times of day.
Example
yaml
id: schedule_condition_timebetween
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute every 5 minutes between 4pm and 8pm."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/5 * * * *"
conditions:
- type: io.kestra.plugin.core.condition.TimeBetween
after: "16:00:00+02:00"
before: "20:00:00+02:00"
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.TimeBetweenio.kestra.plugin.core.condition.TimeBetweenConditionio.kestra.core.models.conditions.types.TimeBetweenConditionafterstring
beforestring
datestring
Default
{{ trigger.date }}Allow events on weekends.
Example
yaml
id: schedule_condition_weekend
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute only on weekends at 11:00 am."
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 11 * * *"
conditions:
- type: io.kestra.plugin.core.condition.Weekend
type*Requiredobject
Possible Values
io.kestra.plugin.core.condition.Weekendio.kestra.plugin.core.condition.WeekendConditionio.kestra.core.models.conditions.types.WeekendConditiondatestring
Default
{{ trigger.date }}deleted string
Default
ADD_FIELDPossible Values
ADD_FIELDNULLDROPdeletedFieldName string
Default
deletedexcludedCollections object
excludedColumns object
excludedDatabases object
excludedTables object
format string
Default
INLINEPossible Values
RAWINLINEWRAPhostname string
Default
ignoreDdl booleanstring
Default
trueincludedCollections object
includedColumns object
includedDatabases object
includedTables object
interval Non-dynamicstring
Default
PT1MFormat
durationkey string
Default
ADD_FIELDPossible Values
ADD_FIELDDROPmaxDuration string
maxRecords integerstring
maxWait string
Default
PT10Smetadata string
Default
ADD_FIELDPossible Values
ADD_FIELDDROPmetadataFieldName string
Default
metadataoffsetsCommitMode string
Default
ON_STOPPossible Values
ON_EACH_BATCHON_STOPpassword string
port string
Default
properties object
snapshotMode string
Default
INITIALPossible Values
INITIALINITIAL_ONLYNO_DATAWHEN_NEEDEDsplitTable string
Default
TABLEPossible Values
OFFDATABASETABLEstateName string
Default
debezium-statestopAfter Non-dynamicarray
SubTypestring
Possible Values
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDusername string
Outputs
size integer
stateHistoryKey string
stateOffsetKey string
uris object
SubTypestring