Blueprints

Event-driven data ingestion to AWS S3 data lake managed by Apache Iceberg, AWS Glue and Amazon Athena

Source

yaml
id: ingest-to-datalake-event-driven
namespace: company.team

variables:
  source_prefix: inbox
  destination_prefix: archive
  database: default
  bucket: kestraio

tasks:
  - id: wdir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: clone_repository
        type: io.kestra.plugin.git.Clone
        url: https://github.com/kestra-io/scripts

      - id: etl
        type: io.kestra.plugin.scripts.python.Commands
        warningOnStdErr: false
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/aws:latest
        env:
          AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - python etl/aws_iceberg_fruit.py {{ vars.destination_prefix }}/{{
            trigger.objects | jq('.[].key') | first }}

  - id: merge_query
    type: io.kestra.plugin.aws.athena.Query
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    database: "{{ vars.database }}"
    outputLocation: s3://{{ vars.bucket }}/query_results/
    query: >
      MERGE INTO fruits f USING raw_fruits r
          ON f.fruit = r.fruit
          WHEN MATCHED
              THEN UPDATE
                  SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
          WHEN NOT MATCHED
              THEN INSERT (id, fruit, berry, update_timestamp)
                    VALUES(r.id, r.fruit, r.berry, current_timestamp);

  - id: optimize
    type: io.kestra.plugin.aws.athena.Query
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    database: "{{ vars.database }}"
    outputLocation: s3://{{ vars.bucket }}/query_results/
    query: |
      OPTIMIZE fruits REWRITE DATA USING BIN_PACK;       

triggers:
  - id: wait_for_new_s3_objects
    type: io.kestra.plugin.aws.s3.Trigger
    bucket: kestraio
    interval: PT1S
    maxKeys: 1
    filter: FILES
    action: MOVE
    prefix: "{{ vars.source_prefix }}"
    moveTo:
      key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
      bucket: "{{ vars.bucket }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

About this blueprint

S3 Trigger AWS Iceberg

This workflow ingests data to an S3 data lake using a Python script. This script is stored in a public GitHub repository so you can directly use this workflow as long as you adjust your AWS credentials, S3 bucket name and the Amazon Athena table name. The script takes the detected S3 object key from the S3 event trigger as input argument and ingests it into the Iceberg table. The script runs in a Docker container -- the image provided here is public, feel free to use it. Alternatively, you can install the required pip dependencies using the beforeCommands property.

Working Directory

Clone

Commands

Docker

Query

Trigger

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra