Blueprints

Anomaly detection using DuckDB SQL query and S3 file event trigger, sending a CSV file attachment via email if anomalies are detected

Source

yaml
id: s3-trigger-duckdb
namespace: company.team

variables:
  bucket: kestraio
  source_prefix: monthly_orders
  destination_prefix: stage_orders

tasks:
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    description: Validate new file for anomalies
    sql: >
      INSTALL httpfs; LOAD httpfs; SET s3_region='{{
      secret('AWS_DEFAULT_REGION') }}'; SET s3_access_key_id='{{
      secret('AWS_ACCESS_KEY_ID') }}'; SET s3_secret_access_key='{{
      secret('AWS_SECRET_ACCESS_KEY') }}'; SELECT * FROM read_csv_auto('s3://{{
      vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects |
      jq('.[].key') | first }}') WHERE price * quantity != total;
    fetchType: STORE

  - id: csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    description: Create CSV file from query results
    from: "{{ outputs.query.uri }}"

  - id: if_anomalies_detected
    type: io.kestra.plugin.core.flow.If
    condition: "{{ outputs.query.size }}"
    description: Send an email if outliers detected
    then:
      - id: send_email_alert
        type: io.kestra.plugin.notifications.mail.MailSend
        subject: Anomalies in data detected
        from: [email protected]
        to: [email protected]
        username: [email protected]
        host: mail.privateemail.com
        port: 465
        password: "{{ secret('EMAIL_PASSWORD') }}"
        sessionTimeout: 6000
        attachments:
          - name: anomalies_in_orders.csv
            uri: "{{ outputs.csv.uri }}"
        htmlTextContent: >
          Detected anomalies in sales data in file <b>s3://{{ vars.bucket }}/{{
          vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first
          }}'</b>. <br />

          Anomalous rows are attached in a CSV file.<br /><br />

          Best regards,<br />

          Data Team

triggers:
  - id: poll_for_new_s3_files
    type: io.kestra.plugin.aws.s3.Trigger
    bucket: "{{ vars.bucket }}"
    prefix: "{{ vars.source_prefix }}"
    maxKeys: 1
    interval: PT1S
    filter: FILES
    action: MOVE
    moveTo:
      key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

About this blueprint

S3 Trigger Notifications DuckDB

This flow will be triggered any time a new file arrives in a given S3 bucket and source_prefix folder. The flow will check for anomalies in the data from that file using a DuckDB query, and will move the file to the same S3 bucket below the destination_prefix folder. If anomalies are detected, the flow will send an email to the recipients specified on the to property, and will send anomalous rows as a CSV file attachment in the same email. If you use MotherDuck, use Kestra Secret to store the MotherDuck service token. Then, modify the query task as follows to point the task to your MotherDuck database:

yaml
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    description: Validate new file for anomalies
    sql: |
      SELECT *
      FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first }}')
      WHERE price * quantity != total;
    fetchType: STORE
    url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"

Query

Ion To Csv

If

Mail Send

Trigger

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra