Blueprints

Process a file from S3 only if it changed since the last execution

About this blueprint

Python S3 Trigger

Add the following Python script named s3_modified.py in the Editor:

python
import boto3
from datetime import datetime
import argparse

def parse_date(date_str):
    if date_str.endswith('Z'):
        return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
    return datetime.fromisoformat(date_str)

def check_s3_object_modification(bucket, object, trigger_date):
    s3 = boto3.client("s3")
    response = s3.head_object(Bucket=bucket, Key=object)
    last_modified = response["LastModified"]
    comparison_datetime = parse_date(trigger_date)

    if last_modified > comparison_datetime:
        print(f"The file '{object}' was modified at {last_modified} and needs to be reprocessed.")
        # TODO implement your processing here, alternatively download the file and process it using other kestra tasks
    else:
        print(f"The file '{object}' is unchanged.")

def main():
    parser = argparse.ArgumentParser(description='Check if an S3 object was modified after a given date.')
    parser.add_argument('bucket_name', help='Name of the S3 bucket')
    parser.add_argument('object_key', help='Key of the S3 object')
    parser.add_argument('comparison_date', help='Date to compare against in ISO format')

    args = parser.parse_args()

    check_s3_object_modification(args.bucket_name, args.object_key, args.comparison_date)

if __name__ == "__main__":
    main()

Make sure to add Secrets for your AWS credentials and adjust the variables to point to your S3 bucket and object.

yaml
id: process_s3_file_if_changed
namespace: dev

variables:
  bucket: kestraio
  object: hello.txt

tasks:
  - id: process_file_if_changed
    type: io.kestra.plugin.scripts.python.Commands
    commands:
      - python s3_modified.py {{ vars.bucket }} {{ vars.object }} {{ trigger.date ?? execution.startDate }}
    docker:
      image: ghcr.io/kestra-io/aws:latest
    namespaceFiles:
      enabled: true
    env:
      AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
      AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"

triggers:
  - id: schedule
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "*/5 * * * *"

Commands

Schedule

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra