Use Debezium to trigger a flow whenever new entries hit a Postgres database, then send notification to Slack and process data in Python
Source
id: listen-debezium
namespace: company.team
tasks:
- id: slack_notificaiton
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK') }}"
payload: |
{
"channel": "U052JMPLBM3",
"text": "{{ trigger.size }} new rows have been added to the database"
}
- id: json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ trigger.uris['postgres.order'] }}"
- id: python
type: io.kestra.plugin.scripts.python.Script
script: |
import json
with open("{{ outputs.json.uri }}", "r") as fopen:
data = json.load(fopen)
print(data)
triggers:
- id: listen_debezium
type: io.kestra.plugin.debezium.postgres.Trigger
hostname: host.docker.internal
port: "5433"
username: postgres
password: example
database: postgres
pluginName: PGOUTPUT
snapshotMode: INITIAL
format: INLINE
interval: PT30S
About this blueprint
Trigger Postgres
This blueprint showcases how you can trigger a Flow based on Debezium data-capture pattern.
Every time new entries reach the database, the flow is triggered. It sends a notification through Slack with the number of rows ingested and then execute a Python script that read the corresponding data in json.
This blueprint can be reproduced with the following docker-compose.yml
setup
db:
image: debezium/postgres:latest
restart: always
environment:
POSTGRES_PASSWORD: example
ports:
- 5433:5432
adminer:
image: adminer
restart: always
ports:
- 8082:8080
You can access localhost:8082 to create and edit databases or tables via the adminer interface. The database is accessible on 5433
port.
Note that depending of your database installation, you might need to change the pluginName
property of the debezium plugin. Other options can be seen in corresponding documentation.