Blueprints
Use Debezium to trigger a flow whenever new entries hit a Postgres database, then send notification to Slack and process data in Python
About this blueprint
Postgres Trigger
This blueprint showcases how you can trigger a Flow based on Debezium data-capture pattern. Every time new entries reach the database, the flow is triggered. It sends a notification through Slack with the number of rows ingested and then execute a Python script that read the corresponding data in json.
This blueprint can be reproduced with the following docker-compose.yml
setup
services:
db:
image: debezium/postgres:latest
restart: always
environment:
POSTGRES_PASSWORD: example
ports:
- 5433:5432
adminer:
image: adminer
restart: always
ports:
- 8082:8080
You can access localhost:8082 to create and edit databases or tables via the adminer interface. The database is accessible on 5433
port.
Note that depending of your database installation, you might need to change the pluginName
property of the debezium plugin. Other options can be seen in corresponding documentation.
yaml
id: listen-debezium
namespace: blueprint
tasks:
- id: slack_notificaiton
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK') }}"
payload: |
{
"channel": "U052JMPLBM3",
"text": "{{ trigger.size }} new rows have been added to the database"
}
- id: json
type: io.kestra.plugin.serdes.json.JsonWriter
from: "{{ trigger.uris['postgres.order'] }}"
- id: python
type: io.kestra.plugin.scripts.python.Script
script: |
import json
with open("{{ outputs.json.uri }}", "r") as fopen:
data = json.load(fopen)
print(data)
triggers:
- id: listen-debezium
type: io.kestra.plugin.debezium.postgres.Trigger
hostname: host.docker.internal
port: "5433"
username: postgres
password: example
database: postgres
pluginName: PGOUTPUT
snapshotMode: INITIAL
format: INLINE
interval: PT3S