Source
yaml
id: ingest-to-datalake-inline-python
namespace: company.team
variables:
bucket: kestraio
prefix: inbox
database: default
tasks:
- id: list_objects
type: io.kestra.plugin.aws.s3.List
prefix: "{{ vars.prefix }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
bucket: "{{ vars.bucket }}"
- id: check
type: io.kestra.plugin.core.flow.If
condition: "{{outputs.list_objects.objects}}"
then:
- id: ingest_to_datalake
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
env:
AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/aws:latest
script: >
import awswrangler as wr from kestra import Kestra
# Iceberg table BUCKET_NAME = "{{ vars.bucket }}" DATABASE = "{{
vars.database }}" TABLE = "raw_fruits"
# Iceberg table's location S3_PATH = f"s3://{BUCKET_NAME}/{TABLE}"
S3_PATH_TMP = f"{S3_PATH}_tmp"
# File to ingest PREFIX = "{{ vars.prefix }}" INGEST_S3_KEY_PATH =
f"s3://{BUCKET_NAME}/{PREFIX}/"
df = wr.s3.read_csv(INGEST_S3_KEY_PATH) nr_rows = df.id.nunique()
print(f"Ingesting {nr_rows} rows")
Kestra.counter("nr_rows", nr_rows, {"table": TABLE})
df = df[~df["fruit"].isin(["Blueberry", "Banana"])] df =
df.drop_duplicates(subset=["fruit"], ignore_index=True, keep="first")
wr.catalog.delete_table_if_exists(database=DATABASE, table=TABLE)
wr.athena.to_iceberg(
df=df,
database=DATABASE,
table=TABLE,
table_location=S3_PATH,
temp_path=S3_PATH_TMP,
partition_cols=["berry"],
keep_files=False,
)
print(f"New data successfully ingested into {S3_PATH}")
- id: merge_query
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: >
MERGE INTO fruits f USING raw_fruits r
ON f.fruit = r.fruit
WHEN MATCHED
THEN UPDATE
SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
WHEN NOT MATCHED
THEN INSERT (id, fruit, berry, update_timestamp)
VALUES(r.id, r.fruit, r.berry, current_timestamp);
- id: optimize
type: io.kestra.plugin.aws.athena.Query
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
database: "{{ vars.database }}"
outputLocation: s3://{{ vars.bucket }}/query_results/
query: |
OPTIMIZE fruits REWRITE DATA USING BIN_PACK;
- id: move_to_archive
type: io.kestra.plugin.aws.cli.AwsCLI
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
commands:
- aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
vars.bucket }}/archive/{{ vars.prefix }}/ --recursive
triggers:
- id: hourly_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "@hourly"
disabled: true
About this blueprint
S3 CLI AWS Python Iceberg If
This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:
- Download the raw CSV files from S3
- Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
- Move already processed files to the
archive/
folder in the same S3 bucket
More Related Blueprints