Blueprints

Ingest data to AWS S3 with Git, Python, Apache Iceberg, AWS Glue and Amazon Athena

Source

yaml
id: ingest-to-datalake-git
namespace: company.team

variables:
  bucket: kestraio
  prefix: inbox
  database: default

tasks:
  - id: list_objects
    type: io.kestra.plugin.aws.s3.List
    prefix: "{{vars.prefix}}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    bucket: "{{ vars.bucket }}"

  - id: check
    type: io.kestra.plugin.core.flow.If
    condition: "{{ outputs.list_objects.objects }}"
    then:
      - id: process_new_objects
        type: io.kestra.plugin.core.flow.WorkingDirectory
        tasks:
          - id: clone_repository
            type: io.kestra.plugin.git.Clone
            url: https://github.com/kestra-io/scripts
            branch: main

          - id: ingest_to_datalake
            type: io.kestra.plugin.scripts.python.Commands
            warningOnStdErr: false
            env:
              AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
              AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
              AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
            taskRunner:
              type: io.kestra.plugin.scripts.runner.docker.Docker
            containerImage: ghcr.io/kestra-io/aws:latest
            commands:
              - python etl/aws_iceberg_fruit.py

      - id: merge_query
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: >
          MERGE INTO fruits f USING raw_fruits r
              ON f.fruit = r.fruit
              WHEN MATCHED
                  THEN UPDATE
                      SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
              WHEN NOT MATCHED
                  THEN INSERT (id, fruit, berry, update_timestamp)
                        VALUES(r.id, r.fruit, r.berry, current_timestamp);

      - id: optimize
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          OPTIMIZE fruits REWRITE DATA USING BIN_PACK;

      - id: move_to_archive
        type: io.kestra.plugin.aws.cli.AwsCLI
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
            vars.bucket }}/archive/{{ vars.prefix }}/ --recursive

triggers:
  - id: hourly_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    disabled: true
    cron: "@hourly"

About this blueprint

S3 CLI AWS Python Git Iceberg If

This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:

  • Download the raw CSV files from S3
  • Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
  • Move already processed files to the archive/ folder in the same S3 bucket

List

If

Working Directory

Clone

Commands

Docker

Query

Aws CLI

Schedule

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra