Blueprints
Upload data to S3 in Python using boto3, transform it in a SQL query with DuckDB and send a CSV report via email every first day of the month
Source
yaml
id: monthly-sales-report
namespace: company.team
variables:
bucket: kestraio
tasks:
- id: raw_data_to_s3
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/aws:latest
env:
AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
script: >
import requests import boto3 from kestra import Kestra
BUCKET = "{{ vars.bucket }}"
def extract_and_upload(file):
url = f"https://huggingface.co/datasets/kestra/datasets/blob/main/{file}"
response = requests.get(url)
data = response.content.decode("utf-8")
s3 = boto3.resource("s3")
s3.Bucket(BUCKET).put_object(Key=file, Body=data)
print(f"{url} downloaded and saved to {BUCKET}/{file}")
for month in range(1, 13):
filename = f"monthly_orders/2023_{str(month).zfill(2)}.csv"
extract_and_upload(filename)
Kestra.outputs({f"{filename}": f"s3://{BUCKET}/{filename}"})
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SET s3_region='{{ secret('AWS_DEFAULT_REGION') }}';
SET s3_access_key_id='{{ secret('AWS_ACCESS_KEY_ID') }}';
SET s3_secret_access_key='{{ secret('AWS_SECRET_ACCESS_KEY') }}';
SELECT month(order_date) as month, sum(total) as total
FROM read_csv_auto('s3://kestraio/monthly_orders/*.csv', FILENAME = 1)
GROUP BY 1
ORDER BY 2 desc;
fetchType: STORE
timeout: PT30S
- id: csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.query.uri }}"
- id: email
type: io.kestra.plugin.notifications.mail.MailSend
subject: The monthly sales report is ready
from: [email protected]
to: [email protected];[email protected]
username: [email protected]
host: mail.privateemail.com
port: 465
password: "{{ secret('EMAIL_PASSWORD') }}"
sessionTimeout: 6000
attachments:
- name: monthly_sales_report.csv
uri: "{{ outputs.csv.uri }}"
htmlTextContent: |
Please find attached the current sales report. <br /><br />
Best regards,<br />
Data Team
triggers:
- id: monthly
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 9 1 * *
About this blueprint
S3 Variables Trigger Python Schedule Outputs Notifications DuckDB
Replace the S3 bucket kestraio
with your bucket name to reproduce the example.
This flow assumes: - an in-process DuckDB - AWS credentials with S3 access permissions stored using Kestra Secret.
If you use MotherDuck and MotherDuck's managed S3 secrets, you can replace the query
task with the following simpler configuration:
yaml
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
SELECT month(order_date) as month, sum(total) as total
FROM read_csv_auto('s3://{{vars.bucket}}/monthly_orders/*.csv', FILENAME = 1)
GROUP BY 1
ORDER BY 2 desc;
fetchType: STORE
timeout: PT30S
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"