Blueprints

Upload data to S3 in Python using boto3, transform it in a SQL query with DuckDB and send a CSV report via email every first day of the month

Source

yaml
id: monthly-sales-report
namespace: company.team

variables:
  bucket: kestraio

tasks:
  - id: raw_data_to_s3
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/aws:latest
    env:
      AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
      AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
    script: >
      import requests import boto3 from kestra import Kestra

      BUCKET = "{{ vars.bucket }}"

      def extract_and_upload(file):
          url = f"https://huggingface.co/datasets/kestra/datasets/blob/main/{file}"

          response = requests.get(url)
          data = response.content.decode("utf-8")
          s3 = boto3.resource("s3")
          s3.Bucket(BUCKET).put_object(Key=file, Body=data)
          print(f"{url} downloaded and saved to {BUCKET}/{file}")

      for month in range(1, 13):
          filename = f"monthly_orders/2023_{str(month).zfill(2)}.csv"
          extract_and_upload(filename)
          Kestra.outputs({f"{filename}": f"s3://{BUCKET}/{filename}"})

  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SET s3_region='{{ secret('AWS_DEFAULT_REGION') }}';
      SET s3_access_key_id='{{ secret('AWS_ACCESS_KEY_ID') }}';
      SET s3_secret_access_key='{{ secret('AWS_SECRET_ACCESS_KEY') }}';
      SELECT month(order_date) as month, sum(total) as total 
      FROM read_csv_auto('s3://kestraio/monthly_orders/*.csv', FILENAME = 1) 
      GROUP BY 1
      ORDER BY 2 desc;
    fetchType: STORE
    timeout: PT30S

  - id: csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    from: "{{ outputs.query.uri }}"

  - id: email
    type: io.kestra.plugin.notifications.mail.MailSend
    subject: The monthly sales report is ready
    from: [email protected]
    to: [email protected];[email protected]
    username: [email protected]
    host: mail.privateemail.com
    port: 465
    password: "{{ secret('EMAIL_PASSWORD') }}"
    sessionTimeout: 6000
    attachments:
      - name: monthly_sales_report.csv
        uri: "{{ outputs.csv.uri }}"
    htmlTextContent: |
      Please find attached the current sales report. <br /><br />
      Best regards,<br />
      Data Team

triggers:
  - id: monthly
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 9 1 * *

About this blueprint

S3 Variables Trigger Python Schedule Outputs Notifications DuckDB

Replace the S3 bucket kestraio with your bucket name to reproduce the example. This flow assumes: - an in-process DuckDB - AWS credentials with S3 access permissions stored using Kestra Secret. If you use MotherDuck and MotherDuck's managed S3 secrets, you can replace the query task with the following simpler configuration:

yaml
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      SELECT month(order_date) as month, sum(total) as total 
      FROM read_csv_auto('s3://{{vars.bucket}}/monthly_orders/*.csv', FILENAME = 1) 
      GROUP BY 1
      ORDER BY 2 desc;
    fetchType: STORE
    timeout: PT30S
    url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"

Script

Docker

Query

Ion To Csv

Mail Send

Schedule

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra