Blueprints

Ingest data to AWS S3 with Python, Apache Iceberg, AWS Glue and Athena

Source

yaml
id: ingest-to-datalake-inline-python
namespace: company.team

variables:
  bucket: kestraio
  prefix: inbox
  database: default

tasks:
  - id: list_objects
    type: io.kestra.plugin.aws.s3.List
    prefix: "{{ vars.prefix }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    bucket: "{{ vars.bucket }}"

  - id: check
    type: io.kestra.plugin.core.flow.If
    condition: "{{outputs.list_objects.objects}}"
    then:
      - id: ingest_to_datalake
        type: io.kestra.plugin.scripts.python.Script
        warningOnStdErr: false
        env:
          AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/aws:latest
        script: >
          import awswrangler as wr from kestra import Kestra

          # Iceberg table BUCKET_NAME = "{{ vars.bucket }}" DATABASE = "{{
          vars.database }}" TABLE = "raw_fruits"

          # Iceberg table's location S3_PATH = f"s3://{BUCKET_NAME}/{TABLE}"
          S3_PATH_TMP = f"{S3_PATH}_tmp"

          # File to ingest PREFIX = "{{ vars.prefix }}" INGEST_S3_KEY_PATH =
          f"s3://{BUCKET_NAME}/{PREFIX}/"

          df = wr.s3.read_csv(INGEST_S3_KEY_PATH) nr_rows = df.id.nunique()
          print(f"Ingesting {nr_rows} rows")

          Kestra.counter("nr_rows", nr_rows, {"table": TABLE})

          df = df[~df["fruit"].isin(["Blueberry", "Banana"])] df =
          df.drop_duplicates(subset=["fruit"], ignore_index=True, keep="first")

          wr.catalog.delete_table_if_exists(database=DATABASE, table=TABLE)

          wr.athena.to_iceberg(
              df=df,
              database=DATABASE,
              table=TABLE,
              table_location=S3_PATH,
              temp_path=S3_PATH_TMP,
              partition_cols=["berry"],
              keep_files=False,
          )

          print(f"New data successfully ingested into {S3_PATH}")

      - id: merge_query
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: >
          MERGE INTO fruits f USING raw_fruits r
              ON f.fruit = r.fruit
              WHEN MATCHED
                  THEN UPDATE
                      SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
              WHEN NOT MATCHED
                  THEN INSERT (id, fruit, berry, update_timestamp)
                        VALUES(r.id, r.fruit, r.berry, current_timestamp);

      - id: optimize
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          OPTIMIZE fruits REWRITE DATA USING BIN_PACK;

      - id: move_to_archive
        type: io.kestra.plugin.aws.cli.AwsCLI
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
            vars.bucket }}/archive/{{ vars.prefix }}/ --recursive

triggers:
  - id: hourly_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "@hourly"
    disabled: true

About this blueprint

S3 CLI AWS Python Iceberg If

This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:

  • Download the raw CSV files from S3
  • Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
  • Move already processed files to the archive/ folder in the same S3 bucket

List

If

Script

Docker

Query

Aws CLI

Schedule

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra