Blueprints
Event-driven data ingestion to AWS S3 data lake managed by Apache Iceberg, AWS Glue and Amazon Athena
Source
yaml
id: ingest-to-datalake-event-driven
namespace: company.team
variables:
source_prefix: inbox
destination_prefix: archive
database: default
bucket: kestraio
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/scripts
- id: etl
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/aws:latest
env:
AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
commands:
- python etl/aws_iceberg_fruit.py {{ vars.destination_prefix }}/{{
trigger.objects | jq('.[].key') | first }}
- id: merge_query
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: >
MERGE INTO fruits f USING raw_fruits r
ON f.fruit = r.fruit
WHEN MATCHED
THEN UPDATE
SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
WHEN NOT MATCHED
THEN INSERT (id, fruit, berry, update_timestamp)
VALUES(r.id, r.fruit, r.berry, current_timestamp);
- id: optimize
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: |
OPTIMIZE fruits REWRITE DATA USING BIN_PACK;
triggers:
- id: wait_for_new_s3_objects
type: io.kestra.plugin.aws.s3.Trigger
bucket: kestraio
interval: PT1S
maxKeys: 1
filter: FILES
action: MOVE
prefix: "{{ vars.source_prefix }}"
moveTo:
key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
bucket: "{{ vars.bucket }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
About this blueprint
S3 Trigger AWS Iceberg
This workflow ingests data to an S3 data lake using a Python script.
This script is stored in a public GitHub repository so you can directly use this workflow as long as you adjust your AWS credentials, S3 bucket name and the Amazon Athena table name. The script takes the detected S3 object key from the S3 event trigger as input argument and ingests it into the Iceberg table. The script runs in a Docker container -- the image provided here is public, feel free to use it. Alternatively, you can install the required pip
dependencies using the beforeCommands
property.