About this blueprint
AWS Python Task Runner
This flow will execute a Python script on AWS ECS Fargate. This requires you to setup AWS Batch, S3 Bucket in the same region that you want to run AWS Batch Jobs and AWS credentials.
In order to support inputFiles
, namespaceFiles
, and outputFiles
, the AWS Batch task runner currently relies on multi-containers ECS jobs and creates three containers for each job:
- A before-container that uploads input files to S3.
- The main container that fetches input files into the
{{ workingDir }}
directory and runs the task. - An after-container that fetches output files using outputFiles to make them available from the Kestra UI for download and preview.
Since we don't know the working directory of the container in advance, we always need to explicitly define the working directory and output directory when using the AWS Batch runner, e.g. use cat {{ workingDir }}/myFile.txt
rather than cat myFile.txt
.
yaml
id: aws_batch_runner
namespace: company.team
variables:
compute_environment_arn: arn:aws:batch:us-east-1:123456789:compute-environment/kestra
job_queue_arn: arn:aws:batch:us-east-1:123456789:job-queue/kestra
execution_role_arn: arn:aws:iam::123456789:role/ecsTaskExecutionRole
task_role_arn: arn:aws:iam::123456789:role/ecsTaskRole
tasks:
- id: send_data
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
region: us-east-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
computeEnvironmentArn: "{{ vars.compute_environment_arn }}"
jobQueueArn: "{{ vars.job_queue_arn}}"
executionRoleArn: "{{ vars.execution_role_arn }}"
taskRoleArn: "{{ vars.task_role_arn }}"
bucket: kestra-us
script: |
import platform
import socket
import sys
print("Hello from AWS Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Host IP Address: {ip_address}")
except socket.error as e:
print("Unable to obtain IP address.")
if __name__ == '__main__':
print_environment_info()