🚀 New! Kestra raises $3 million to grow Learn more

Capture Capture

yaml
type: "io.kestra.plugin.debezium.postgres.Capture"

Wait for change data capture event on PostgresSQL server

Examples

yaml
id: "capture"
type: "io.kestra.plugin.debezium.postgres.Capture"
hostname: 127.0.0.1
port: 5432
username: posgres
password: psql_passwd
maxRecords: 100
database: my_database
pluginName: PGOUTPUT
snapshotMode: ALWAYS

Properties

database

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The name of the PostgreSQL database from which to stream the changes.

deleted

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: ADD_FIELD
  • Possible Values:
    • ADD_FIELD
    • NULL
    • DROP

How to handle deleted rows

Possible settings are:

  • ADD_FIELD: add a deleted fields as boolean.
  • NULL: send a row will all values as null.
  • DROP: don't send row deleted.

deletedFieldName

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: deleted

The name of deleted fields if deleted is ADD_FIELD

format

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: INLINE
  • Possible Values:
    • RAW
    • INLINE
    • WRAP

The format of output

Possible settings are:

  • RAW: Send raw data from debezium.
  • INLINE: Send a row like in the source with only data (remove after & before), all the cols will be present on each rows.
  • WRAP: Send a row like INLINE but wrapped on a record field.

hostname

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Hostname of the remote server

ignoreDdl

  • Type: boolean
  • Dynamic:
  • Required: ✔️
  • Default: true

Ignore ddl statement

Ignore create table and others administration operations

key

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: ADD_FIELD
  • Possible Values:
    • ADD_FIELD
    • DROP

How to handle key

Possible settings are:

  • ADD_FIELD: add key(s) merged with cols.
  • DROP: drop keys.

metadata

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: ADD_FIELD
  • Possible Values:
    • ADD_FIELD
    • DROP

How to handle metadata

Possible settings are:

  • ADD_FIELD: add metadata in a col named metadata.
  • DROP: drop keys.

metadataFieldName

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: metadata

The name of metadata fields if metadata is ADD_FIELD

pluginName

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: PGOUTPUT
  • Possible Values:
    • DECODERBUFS
    • WAL2JSON
    • WAL2JSON_RDS
    • WAL2JSON_STREAMING
    • WAL2JSON_RDS_STREAMING
    • PGOUTPUT

The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.

If you are using a wal2json plug-in and transactions are very large, the JSON batch event that contains all transaction changes might not fit into the hard-coded memory buffer, which has a size of 1 GB. In such cases, switch to a streaming plug-in, by setting the plugin-name property to wal2json_streaming or wal2json_rds_streaming. With a streaming plug-in, PostgreSQL sends the connector a separate message for each change in a transaction.

port

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Port of the remote server

publicationName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: kestra_publication

The name of the PostgreSQL publication created for streaming changes when using pgoutput.

This publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time.

If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.

slotName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: kestra

The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema.

The server uses this slot to stream events to the Debezium connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."

snapshotMode

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: INITIAL
  • Possible Values:
    • INITIAL
    • ALWAYS
    • NEVER
    • INITIAL_ONLY

Specifies the criteria for running a snapshot when the connector starts.

Possible settings are:

  • INITIAL: The connector performs a snapshot only when no offsets have been recorded for the logical server name.
  • ALWAYS: The connector performs a snapshot each time the connector starts.
  • NEVER: The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.
  • INITIAL_ONLY: The connector performs an initial snapshot and then stops, without processing any subsequent changes.

splitTable

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: TABLE
  • Possible Values:
    • OFF
    • DATABASE
    • TABLE

Split table on separate output uris

Possible settings are:

  • TABLE: will split all rows by tables on output with name database.table
  • DATABASE: will split all rows by database on output with name database.
  • OFF: will NOT split all rows resulting a single data output.

stateName

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: debezium-state

The name of Debezium state file

excludedColumns

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values.

Fully-qualified names for columns are of the form databaseName.tableName.columnName.

excludedDatabases

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes.

The connector captures changes in any database whose name is not in the excludedDatabases. Do not also set the includedDatabases connector configuration property.

excludedTables

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture.

The connector captures changes in any table not included in excludedTables. Each identifier is of the form databaseName.tableName. Do not also specify the includedTables connector configuration property.

includedColumns

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values.

Fully-qualified names for columns are of the form databaseName.tableName.columnName.

includedDatabases

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes.

The connector does not capture changes in any database whose name is not in includedDatabases. By default, the connector captures changes in all databases. Do not also set the excludedDatabases connector configuration property.

includedTables

  • Type: object
  • Dynamic: ✔️
  • Required:

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture.

The connector does not capture changes in any table not included in includedTables. Each identifier is of the form databaseName.tableName. By default, the connector captures changes in every non-system table in each database whose changes are being captured. Do not also specify the excludedTables connector configuration property.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max total processing duration

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

maxWait

  • Type: string
  • Dynamic:
  • Required:
  • Default: 10.000000000
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

password

  • Type: string
  • Dynamic: ✔️
  • Required:

Password on the remote server

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Additional configuration properties

Any additional configuration properties that is valid for the current driver

sslCert

  • Type: string
  • Dynamic: ✔️
  • Required:

The SSL certificate for the client.

Must be a PEM encoded certificate

sslKey

  • Type: string
  • Dynamic: ✔️
  • Required:

The SSL private key of the client.

Must be a PEM encoded key

sslKeyPassword

  • Type: string
  • Dynamic: ✔️
  • Required:

The password to access the client private key sslKey.

sslMode

  • Type: string
  • Dynamic:
  • Required:
  • Default: DISABLE
  • Possible Values:
    • DISABLE
    • REQUIRE
    • VERIFY_CA
    • VERIFY_FULL

Whether to use an encrypted connection to the PostgreSQL server. Options include:

  • DISABLE uses an unencrypted connection.
  • REQUIRE uses a secure (encrypted) connection, and fails if one cannot be established.
  • VERIFY_CA behaves like require but also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.
  • VERIFY_FULL behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect.

See the PostgreSQL documentation for more information.

sslRootCert

  • Type: string
  • Dynamic: ✔️
  • Required:

The root certificate(s) against which the server is validated.

Must be a PEM encoded certificate

username

  • Type: string
  • Dynamic: ✔️
  • Required:

Username on the remote server

Outputs

size

  • Type: integer

The size of the rows fetch

stateHistory

  • Type: string

The state with database history

stateOffset

  • Type: string

The state with offset

uris

  • Type: object
  • SubType: string

Uri of the generated internal storage file