Blueprints

Process a file from S3 only if it changed since the last execution

Source

yaml
id: process-s3-file-if-changed
namespace: company.team

variables:
  bucket: kestraio
  object: hello.txt

tasks:
  - id: process_file_if_changed
    type: io.kestra.plugin.scripts.python.Commands
    commands:
      - python s3_modified.py {{ vars.bucket }} {{ vars.object }} {{
        trigger.date ?? execution.startDate }}
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/aws:latest
    namespaceFiles:
      enabled: true
    env:
      AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
      AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "*/5 * * * *"

About this blueprint

S3 Trigger Python

Add the following Python script named s3_modified.py in the Editor:

python
def parse_date(date_str):
    if date_str.endswith('Z'):
        return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
    return datetime.fromisoformat(date_str)

def check_s3_object_modification(bucket, object, trigger_date):
    s3 = boto3.client("s3")
    response = s3.head_object(Bucket=bucket, Key=object)
    last_modified = response["LastModified"]
    comparison_datetime = parse_date(trigger_date)

    if last_modified > comparison_datetime:
        print(f"The file '{object}' was modified at {last_modified} and needs to be reprocessed.")
        # TODO implement your processing here, alternatively download the file and process it using other kestra tasks
    else:
        print(f"The file '{object}' is unchanged.")

def main():
    parser = argparse.ArgumentParser(description='Check if an S3 object was modified after a given date.')
    parser.add_argument('bucket_name', help='Name of the S3 bucket')
    parser.add_argument('object_key', help='Key of the S3 object')
    parser.add_argument('comparison_date', help='Date to compare against in ISO format')

    args = parser.parse_args()

    check_s3_object_modification(args.bucket_name, args.object_key, args.comparison_date)

if __name__ == "__main__":
    main()

Make sure to add Secrets for your AWS credentials and adjust the variables to point to your S3 bucket and object.

Commands

Docker

Schedule

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra