Source
yaml
id: ingest-to-datalake-git
namespace: company.team
variables:
bucket: kestraio
prefix: inbox
database: default
tasks:
- id: list_objects
type: io.kestra.plugin.aws.s3.List
prefix: "{{vars.prefix}}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
bucket: "{{ vars.bucket }}"
- id: check
type: io.kestra.plugin.core.flow.If
condition: "{{ outputs.list_objects.objects }}"
then:
- id: process_new_objects
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/scripts
branch: main
- id: ingest_to_datalake
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
env:
AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/aws:latest
commands:
- python etl/aws_iceberg_fruit.py
- id: merge_query
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: >
MERGE INTO fruits f USING raw_fruits r
ON f.fruit = r.fruit
WHEN MATCHED
THEN UPDATE
SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
WHEN NOT MATCHED
THEN INSERT (id, fruit, berry, update_timestamp)
VALUES(r.id, r.fruit, r.berry, current_timestamp);
- id: optimize
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: |
OPTIMIZE fruits REWRITE DATA USING BIN_PACK;
- id: move_to_archive
type: io.kestra.plugin.aws.cli.AwsCLI
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
commands:
- aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
vars.bucket }}/archive/{{ vars.prefix }}/ --recursive
triggers:
- id: hourly_schedule
type: io.kestra.plugin.core.trigger.Schedule
disabled: true
cron: "@hourly"
About this blueprint
S3 CLI AWS Python Git Iceberg If
This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:
- Download the raw CSV files from S3
- Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
- Move already processed files to the
archive/
folder in the same S3 bucket