Blueprints

Run multiple Python scripts in parallel on AWS ECS Fargate with AWS Batch

About this blueprint

AWS CLI Docker Git Outputs Python

This flow will clone a Git repository that defines Terraform resources to run script tasks on AWS ECS Fargate including the AWS Batch compute environment, job queue, and ECS task roles.

The only prerequisites are AWS credentials and an S3 Bucket in the same region in which you want to run AWS Batch jobs.

We assume that you have a default VPC in the region you are deploying to. If you do not have a default VPC, you can create it using the following command:

aws ec2 create-default-vpc --region us-east-1 # replace with your chosen AWS region

Once the flow completes, you can download the Terraform output to see the ARNs of the AWS Batch compute environment, job queue, and ECS task roles. You can store these as pluginDefaults in your namespace to run all Python scripts on AWS ECS Fargate by default.

yaml
id: aws_batch_terraform_git
namespace: company.team

inputs:
  - id: bucket
    type: STRING
    defaults: kestra-us

  - id: region
    type: STRING
    defaults: us-east-1

tasks:
  - id: wdir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: git
        type: io.kestra.plugin.git.Clone
        url: https://github.com/kestra-io/terraform-deployments
        branch: main

      - id: tf
        type: io.kestra.plugin.terraform.cli.TerraformCLI
        inputFiles:
          backend.tf: |
            terraform {
              backend "s3" {
                region = "{{ inputs.region }}"
                bucket = "{{ inputs.bucket }}"
                key    = "terraform.tfstate"
              }
            }
        commands:
          - mv aws-batch/* .
          - terraform init
          - terraform apply -auto-approve
          - terraform output > output.txt
        env:
          TF_VAR_region: "{{ inputs.region }}"
          TF_VAR_bucket: "{{ inputs.bucket }}"
          AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          AWS_DEFAULT_REGION: "{{ inputs.region }}"
        outputFiles:
          - "*.txt"

      - id: parse_tf_output
        type: io.kestra.plugin.scripts.python.Script
        containerImage: ghcr.io/kestra-io/kestrapy:latest
        inputFiles:
          terraform.txt: "{{ outputs.tf.outputFiles['output.txt'] }}"
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        script: |
          from kestra import Kestra

          outputs = {}
          with open("terraform.txt", "r") as file:
              for line in file:
                  key, value = line.strip().split(" = ")
                  outputs[key] = value.strip('"')

          Kestra.outputs(outputs)

  - id: parallel_ecs_fargate_tasks
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: run_command
        type: io.kestra.plugin.scripts.python.Commands
        containerImage: ghcr.io/kestra-io/kestrapy:latest
        taskRunner:
          type: io.kestra.plugin.ee.aws.runner.Batch
          computeEnvironmentArn: "{{ outputs.parse_tf_output.vars.batch_compute_environment_arn }}"
          jobQueueArn: "{{ outputs.parse_tf_output.vars.batch_job_queue_arn }}"
          executionRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_execution_role_arn }}"
          taskRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_role_arn }}"
          accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          region: "{{ inputs.region }}"
          bucket: "{{ inputs.bucket }}"
        commands:
          - pip show kestra

      - id: run_python_script
        type: io.kestra.plugin.scripts.python.Script
        containerImage: ghcr.io/kestra-io/pydata:latest
        taskRunner:
          type: io.kestra.plugin.ee.aws.runner.Batch
          computeEnvironmentArn: "{{ outputs.parse_tf_output.vars.batch_compute_environment_arn }}"
          jobQueueArn: "{{ outputs.parse_tf_output.vars.batch_job_queue_arn }}"
          executionRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_execution_role_arn }}"
          taskRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_role_arn }}"
          accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          region: "{{ inputs.region }}"
          bucket: "{{ inputs.bucket }}"
        script: |
          import platform
          import socket
          import sys

          print("Hello from AWS Batch and kestra!")

          def print_environment_info():
              print(f"Host's network name: {platform.node()}")
              print(f"Python version: {platform.python_version()}")
              print(f"Platform information (instance type): {platform.platform()}")
              print(f"OS/Arch: {sys.platform}/{platform.machine()}")

              try:
                  hostname = socket.gethostname()
                  ip_address = socket.gethostbyname(hostname)
                  print(f"Host IP Address: {ip_address}")
              except socket.error as e:
                  print("Unable to obtain IP address.")


          print_environment_info()

Working Directory

Clone

Terraform CLI

Script

Docker

Parallel

Commands

Batch

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra