RealtimeTrigger
Trigger a flow on real-time message consumption from a DB2 database via change data capture and create one execution per row.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.debezium.db2.Trigger instead.
type: "io.kestra.plugin.debezium.db2.RealtimeTrigger"
Examples
Consume a message from a DB2 database via change data capture in real-time.
id: debezium_db2
namespace: company.team
tasks:
- id: send_data
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.data }}"
triggers:
- id: realtime
type: io.kestra.plugin.debezium.db2.RealtimeTrigger
hostname: 127.0.0.1
port: "50000"
username: "{{ secret('DB2_USERNAME') }}"
password: "{{ secret('DB2_PASSWORD') }}"
database: my_database
Properties
database *Requiredstring
The name of the DB2 database from which to stream the changes.
deletedFieldName *Requiredstring
deleted
The name of deleted field if deleted is ADD_FIELD
.
hostname *Requiredstring
Hostname of the remote server.
key *Requiredstring
ADD_FIELD
ADD_FIELD
DROP
Specify how to handle key.
Possible settings are:
ADD_FIELD
: Add key(s) merged with columns.DROP
: Drop keys.
metadata *Requiredstring
ADD_FIELD
ADD_FIELD
DROP
Specify how to handle metadata.
Possible settings are:
ADD_FIELD
: Add metadata in a column namedmetadata
.DROP
: Drop metadata.
port *Requiredstring
Port of the remote server.
splitTable *Requiredstring
TABLE
OFF
DATABASE
TABLE
Split table on separate output uris
.
Possible settings are:
TABLE
: This will split all rows by tables on output with namedatabase.table
DATABASE
: This will split all rows by databases on output with namedatabase
.OFF
: This will NOT split all rows resulting in a singledata
output.
conditions Non-dynamicarray
List of conditions in order to limit the flow trigger.
deleted string
ADD_FIELD
ADD_FIELD
NULL
DROP
Specify how to handle deleted rows.
Possible settings are:
ADD_FIELD
: Add a deleted field as boolean.NULL
: Send a row with all values as null.DROP
: Don't send deleted row.
excludedColumns object
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values.
Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the includedColumns
connector configuration property."
excludedDatabases object
An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes.
The connector captures changes in any database whose name is not in the excludedDatabases
. Do not also set the includedDatabases
connector configuration property.
excludedTables object
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture.
The connector captures changes in any table not included in excludedTables
. Each identifier is of the form databaseName.tableName. Do not also specify the includedTables
connector configuration property.
format string
INLINE
RAW
INLINE
WRAP
The format of the output.
Possible settings are:
RAW
: Send raw data from debezium.INLINE
: Send a row like in the source with only data (remove after & before), all the columns will be present for each row.WRAP
: Send a row like INLINE but wrapped in arecord
field.
ignoreDdl booleanstring
true
Ignore DDL statement.
Ignore CREATE, ALTER, DROP and TRUNCATE operations.
includedColumns object
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values.
Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the excludedColumns
connector configuration property.
includedDatabases object
An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes.
The connector does not capture changes in any database whose name is not in includedDatabases
. By default, the connector captures changes in all databases. Do not also set the excludedDatabases
connector configuration property.
includedTables object
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture.
The connector does not capture changes in any table not included in includedTables
. Each identifier is of the form databaseName.tableName. By default, the connector captures changes in every non-system table in each database whose changes are being captured. Do not also specify the excludedTables
connector configuration property.
metadataFieldName string
metadata
The name of metadata field if metadata is ADD_FIELD
.
offsetsCommitMode string
ON_EACH_BATCH
ON_EACH_BATCH
ON_STOP
How to commit the offsets to the KV Store.
Possible values are:
- ON_EACH_BATCH: after each batch of records consumed by this trigger, the offsets will be stored in the KV Store. This avoids any duplicated records being consumed but can be costly if many events are produced.
- ON_STOP: when this trigger is stopped or killed, the offsets will be stored in the KV Store. This avoid any un-necessary writes to the KV Store, but if the trigger is not stopped gracefully, the KV Store value may not be updated leading to duplicated records consumption.
password string
Password on the remote server.
properties object
Additional configuration properties.
Any additional configuration properties that is valid for the current driver.
snapshotMode string
INITIAL
ALWAYS
INITIAL
INITIAL_ONLY
WHEN_NEEDED
NO_DATA
RECOVERY
Specifies the criteria for running a snapshot when the connector starts.
Possible settings are:
ALWAYS
: The connector performs a snapshot every time that it starts.INITIAL
: The connector runs a snapshot only when no offsets have been recorded for the logical server name.INITIAL_ONLY
: The connector runs a snapshot only when no offsets have been recorded for the logical server name and then stops; i.e. it will not read change events from the binlog.WHEN_NEEDED
: After the connector starts, it performs a snapshot only if it detects one of the following circumstances: 1. It cannot detect any topic offsets. 2. A previously recorded offset specifies a log position that is not available on the server.NO_DATA
: The connector captures the structure of all relevant tables, performing all the steps described in the INITIAL, except that it does not create READ events to represent the data set at the point of the connector’s start-up.RECOVERY
: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables.
stateName string
debezium-state
The name of the Debezium state file stored in the KV Store for that namespace.
stopAfter Non-dynamicarray
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
BREAKPOINT
List of execution states after which a trigger should be stopped (a.k.a. disabled).
username string
Username on the remote server.
Outputs
data object
Data.
Data extracted.
stream string
Stream.
Stream source
Definitions
io.kestra.core.models.triggers.TimeWindow
deadline string
partial-time
SLA daily deadline
Use it only for DAILY_TIME_DEADLINE
SLA.
endTime string
partial-time
SLA daily end time
Use it only for DAILY_TIME_WINDOW
SLA.
startTime string
partial-time
SLA daily start time
Use it only for DAILY_TIME_WINDOW
SLA.
type string
DURATION_WINDOW
DAILY_TIME_DEADLINE
DAILY_TIME_WINDOW
DURATION_WINDOW
SLIDING_WINDOW
The type of the SLA
The default SLA is a sliding window (DURATION_WINDOW
) with a window of 24 hours.
window string
duration
The duration of the window
Use it only for DURATION_WINDOW
or SLIDING_WINDOW
SLA.
See ISO_8601 Durations for more information of available duration value.
The start of the window is always based on midnight except if you set windowAdvance parameter. Eg if you have a 10 minutes (PT10M) window,
the first window will be 00: 00 to 00: 10 and a new window will be started each 10 minutes
windowAdvance string
duration
The window advance duration
Use it only for DURATION_WINDOW
SLA.
Allow to specify the start time of the window
Eg: you want a window of 6 hours (window=PT6H), by default the check will be done between: 00: 00 and 06: 00, 06: 00 and 12: 00, 12: 00 and 18: 00, and 18: 00 and 00: 00.
If you want to check the window between 03: 00 and 09: 00, 09: 00 and 15: 00, 15: 00 and 21: 00, and 21: 00 and 3: 00, you will have to shift the window of 3 hours by settings windowAdvance: PT3H
Condition for a specific flow of an execution.
flowId *Requiredstring
The flow id.
namespace *Requiredstring
The namespace of the flow.
type *Requiredobject
Condition for a flow namespace.
namespace *Requiredstring
The namespace of the flow or the prefix if prefix
is true.
type *Requiredobject
prefix boolean
false
If we must look at the flow namespace by prefix (checked using startWith). The prefix is case sensitive.
Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead.
flowId *Requiredstring
The flow id.
namespace *Requiredstring
The namespace of the flow.
type *Requiredobject
Condition to allow events between two specific times.
type *Requiredobject
after string
time
The time to test must be after this one.
Must be a valid ISO 8601 time with offset.
before string
time
The time to test must be before this one.
Must be a valid ISO 8601 time with offset.
date string
{{ trigger.date }}
The time to test.
Can be any variable or any valid ISO 8601 time. By default, it will use the trigger date.
Condition that check labels of an execution.
labels *Requiredarrayobject
List of labels to match in the execution.
type *Requiredobject
Condition based on the outputs of an upstream execution.
expression *Requiredstring
type *Requiredobject
Condition to allow events on weekend.
type *Requiredobject
date string
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition to have at least one condition validated.
conditions *Requiredarray
1
The list of conditions to validate.
If any condition is true, it will allow the event's execution.
type *Requiredobject
Condition for an execution namespace.
namespace *Requiredstring
String against which to match the execution namespace depending on the provided comparison.
type *Requiredobject
comparison string
EQUALS
PREFIX
SUFFIX
Comparison to use when checking if namespace matches. If not provided, it will use EQUALS
by default.
prefix booleanstring
false
Whether to look at the flow namespace by prefix. Shortcut for comparison: PREFIX
.
Only used when comparison
is not set
Run a flow if the list of preconditions are met in a time window.
conditions *Requiredobject
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
A unique id for the condition
type *Requiredobject
resetOnSuccess boolean
true
Whether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
This means that to create multiple executions, the same set of conditions needs to be evaluated to true
multiple times.
You can disable this by setting this property to false
so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution.
timeWindow TimeWindow
{
"type": "DURATION_WINDOW"
}
Define the time period (or window) for evaluating preconditions.
You can set the type
of sla
to one of the following values:
DURATION_WINDOW
: this is the defaulttype
. It uses a start time (windowAdvance
) and end time (window
) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined durationwindow
. For example, with a 1-day window (the default option:window: PT1D
), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00: 00: 00) each day. If you setwindowAdvance: PT6H
, the window will start at 6 AM each day. If you setwindowAdvance: PT6H
and you also override thewindow
property toPT6H
, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06: 00 to 12: 00, 12: 00 to 18: 00, 18: 00 to 00: 00, and 00: 00 to 06: 00, and so on.SLIDING_WINDOW
: this option also evaluates SLA conditions over a fixed timewindow
, but it always goes backward from the current time. For example, a sliding window of 1 hour (window: PT1H
) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.DAILY_TIME_DEADLINE
: this option declares that some SLA conditions should be met "before a specific time in a day". With the string propertydeadline
, you can configure a daily cutoff for checking conditions. For example,deadline: "09: 00: 00"
means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.DAILY_TIME_WINDOW
: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window fromstartTime: "06: 00: 00"
toendTime: "09: 00: 00"
evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
Condition to exclude others conditions.
conditions *Requiredarray
1
The list of conditions to exclude.
If any conditions is true, it will prevent the event's execution.
type *Requiredobject
Condition to execute tasks on a specific day of the week relative to the current month (first, last, ...)
dayInMonth *Requiredstring
FIRST
LAST
SECOND
THIRD
FOURTH
Are you looking for the first or the last day in the month?
dayOfWeek *Requiredstring
MONDAY
TUESDAY
WEDNESDAY
THURSDAY
FRIDAY
SATURDAY
SUNDAY
The day of week.
type *Requiredobject
date string
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition based on variable expression.
expression *Requiredstring
type *Requiredobject
Condition to allow events on a particular day of the week.
dayOfWeek *Requiredstring
MONDAY
TUESDAY
WEDNESDAY
THURSDAY
FRIDAY
SATURDAY
SUNDAY
The day of week.
type *Requiredobject
date string
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition based on execution status.
type *Requiredobject
in array
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
BREAKPOINT
List of states that are authorized.
notIn array
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
BREAKPOINT
List of states that aren't authorized.
Condition to allow events between two specific datetime values.
type *Requiredobject
after string
date-time
The date to test must be after this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
before string
date-time
The date to test must be before this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
date string
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition that matches if any taskRun has retry attempts.
type *Requiredobject
in array
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
BREAKPOINT
List of states that are authorized.
notIn array
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
BREAKPOINT
List of states that aren't authorized.
Condition to allow events on public holidays.
type *Requiredobject
country string
ISO 3166-1 alpha-2 country code. If not set, it uses the country code from the default locale.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.
date string
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
subDivision string
ISO 3166-2 country subdivision (e.g., provinces and states) code.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.