Anomaly detection using DuckDB SQL query and S3 file event trigger, sending a CSV file attachment via email if anomalies are detected
id: s3-trigger-duckdb
bucket: kestraio
source_prefix: monthly_orders
destination_prefix: stage_orders
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
description: Validate new file for anomalies
sql: >
INSTALL httpfs; LOAD httpfs; SET s3_region='{{
secret('AWS_DEFAULT_REGION') }}'; SET s3_access_key_id='{{
secret('AWS_ACCESS_KEY_ID') }}'; SET s3_secret_access_key='{{
secret('AWS_SECRET_ACCESS_KEY') }}'; SELECT * FROM read_csv_auto('s3://{{
vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects |
jq('.[].key') | first }}') WHERE price * quantity != total;
fetchType: STORE
- id: csv
type: io.kestra.plugin.serdes.csv.IonToCsv
description: Create CSV file from query results
from: "{{ outputs.query.uri }}"
- id: if_anomalies_detected
type: io.kestra.plugin.core.flow.If
condition: "{{ outputs.query.size }}"
description: Send an email if outliers detected
- id: send_email_alert
type: io.kestra.plugin.notifications.mail.MailSend
subject: Anomalies in data detected
from: [email protected]
to: [email protected]
username: [email protected]
port: 465
password: "{{ secret('EMAIL_PASSWORD') }}"
sessionTimeout: 6000
- name: anomalies_in_orders.csv
uri: "{{ outputs.csv.uri }}"
htmlTextContent: >
Detected anomalies in sales data in file <b>s3://{{ vars.bucket }}/{{
vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first
}}'</b>. <br />
Anomalous rows are attached in a CSV file.<br /><br />
Best regards,<br />
Data Team
- id: poll_for_new_s3_files
bucket: "{{ vars.bucket }}"
prefix: "{{ vars.source_prefix }}"
maxKeys: 1
interval: PT1S
filter: FILES
action: MOVE
key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
About this blueprint
S3 Trigger Notifications DuckDB
This flow will be triggered any time a new file arrives in a given S3 bucket
and source_prefix
The flow will check for anomalies in the data from that file using a DuckDB query, and will move the file to the same S3 bucket below the destination_prefix
If anomalies are detected, the flow will send an email to the recipients specified on the to
property, and will send anomalous rows as a CSV file attachment in the same email.
If you use MotherDuck, use Kestra Secret to store the MotherDuck service token. Then, modify the query
task as follows to point the task to your MotherDuck database:
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
description: Validate new file for anomalies
sql: |
FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first }}')
WHERE price * quantity != total;
fetchType: STORE
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"
More Related Blueprints