Blueprints

Use Debezium to trigger a flow whenever new entries hit a Postgres database, then send notification to Slack and process data in Python

About this blueprint

Postgres Trigger

This blueprint showcases how you can trigger a Flow based on Debezium data-capture pattern. Every time new entries reach the database, the flow is triggered. It sends a notification through Slack with the number of rows ingested and then execute a Python script that read the corresponding data in json.

This blueprint can be reproduced with the following docker-compose.yml setup

services:

  db:
    image: debezium/postgres:latest
    restart: always
    environment:
      POSTGRES_PASSWORD: example
    ports:
      - 5433:5432

  adminer:
    image: adminer
    restart: always
    ports:
      - 8082:8080

You can access localhost:8082 to create and edit databases or tables via the adminer interface. The database is accessible on 5433 port.

Note that depending of your database installation, you might need to change the pluginName property of the debezium plugin. Other options can be seen in corresponding documentation.

yaml
id: listen-debezium
namespace: blueprint
tasks:

  - id: slack_notificaiton
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK') }}"
    payload: |
      {
        "channel": "U052JMPLBM3",
        "text": "{{ trigger.size }} new rows have been added to the database"
      }

  - id: json
    type: io.kestra.plugin.serdes.json.JsonWriter
    from: "{{ trigger.uris['postgres.order'] }}"

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import json

      with open("{{ outputs.json.uri }}", "r") as fopen:
        data = json.load(fopen)

      print(data)

triggers:
  - id: listen-debezium
    type: io.kestra.plugin.debezium.postgres.Trigger
    hostname: host.docker.internal
    port: "5433"
    username: postgres
    password: example
    database: postgres
    pluginName: PGOUTPUT
    snapshotMode: INITIAL
    format: INLINE
    interval: PT3S

Slack Incoming Webhook

Json Writer

Script

Trigger

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra