Event-driven data ingestion to AWS S3 data lake managed by Apache Iceberg, AWS Glue and Amazon Athena
Source
id: ingest-to-datalake-event-driven
namespace: company.team
variables:
  source_prefix: inbox
  destination_prefix: archive
  database: default
  bucket: kestraio
tasks:
  - id: wdir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: clone_repository
        type: io.kestra.plugin.git.Clone
        url: https://github.com/kestra-io/scripts
      - id: etl
        type: io.kestra.plugin.scripts.python.Commands
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/aws:latest
        env:
          AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - python etl/aws_iceberg_fruit.py {{ vars.destination_prefix }}/{{
            trigger.objects | jq('.[].key') | first }}
  - id: merge_query
    type: io.kestra.plugin.aws.athena.Query
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    database: "{{ vars.database }}"
    outputLocation: s3://{{ vars.bucket }}/query_results/
    query: |
      MERGE INTO fruits f USING raw_fruits r
          ON f.fruit = r.fruit
          WHEN MATCHED
              THEN UPDATE
                  SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
          WHEN NOT MATCHED
              THEN INSERT (id, fruit, berry, update_timestamp)
                    VALUES(r.id, r.fruit, r.berry, current_timestamp);
  - id: optimize
    type: io.kestra.plugin.aws.athena.Query
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    database: "{{ vars.database }}"
    outputLocation: s3://{{ vars.bucket }}/query_results/
    query: |
      OPTIMIZE fruits REWRITE DATA USING BIN_PACK;       
triggers:
  - id: wait_for_new_s3_objects
    type: io.kestra.plugin.aws.s3.Trigger
    bucket: kestraio
    interval: PT1S
    maxKeys: 1
    filter: FILES
    action: MOVE
    prefix: "{{ vars.source_prefix }}"
    moveTo:
      key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
      bucket: "{{ vars.bucket }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
About this blueprint
Python AWS
This workflow ingests data to an S3 data lake using a Python script.
This script is stored in a public GitHub repository so you can directly use
this workflow as long as you adjust your AWS credentials, S3 bucket name and
the Amazon Athena table name. The script takes the detected S3 object key
from the S3 event trigger as input argument and ingests it into the Iceberg
table. The script runs in a Docker container -- the image provided here is
public, feel free to use it. Alternatively, you can install the required
pip dependencies using the beforeCommands property.
More Related Blueprints
