Task Runners
Available on: >= 0.16.0
What are script tasks?
Script tasks are built on top of the Script Plugin. They are used to run commands or execute a script. The subplugins that depend on the script plugin include:
Anytime you see a task that can execute a script or a series of commands, it's a script task.
Task runners before Kestra 0.16.0
Until Kestra 0.15.x, you could configure the script tasks to run in local processes or in Docker containers by using the runner
property.
Task runners in Kestra 0.16.0 and beyond
Kestra 0.16.0 introduces a new taskRunner
property that provides more flexibility than the existing runner
property and allows you to run your your code in different remote environments, including Kubernetes, AWS Batch, Azure Batch, Google Batch, and more. Since each taskRunner
type is a plugin, you can create your own runner plugins (e.g. for Google CloudRun) and if you'd like, you can contribute them to kestra.
We envision task runners as a pluggable system allowing you to run any code anywhere without having to worry about the underlying infrastructure.
The motivation behind the taskRunner
The main limitation of the runner
property is that it's a single ENUM-type property. This means that all runner-specific properties (such as the amount of CPU and memory a container can use) would need to be included directly within the plugin-script
library. As you can imagine, this makes it difficult to manage and contribute new runners in the future:
- Which properties are generic script-task properties and which are runner-specific?
- Which properties belong to which runners?
- Why do I need to have Azure and Google SDK dependencies baked into the script plugin when I only use AWS?
Including everything in the plugin-script
library would create a strong coupling between the script tasks and the runners, and we want to avoid that. Therefore, we've introduced the taskRunner
, which allows you to specify the runner type and its properties in a flexible way.
Keep in mind that scrip runners are currently in Beta.
The benefits of the taskRunner
Centralized configuration management
Task runners make it easy to centrally govern your configuration. For example, since the logic for deploying a container to AWS Batch is included in the AWS plugin rather than in the plugin-script
library, you can leverage taskDefaults
on a namespace level to centrally manage your AWS credentials:
taskDefaults:
- type: io.kestra.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "us-east-1"
This configuration will apply to all tasks, triggers and runners from the io.kestra.plugin.aws
plugin, including the io.kestra.plugin.aws.runner.AwsBatchTaskRunner
.
Documentation and autocompletion
Each task runner is a plugin with its own icon, documentation, and schema to validate its properties. The built-in code editor provides autocompletion and syntax validation for all runner properties, and when you click on the runner's name in the editor, you can see its documentation on the right side of the screen.
Easier to build new custom runners
You can create a custom runner plugin for your specific environment, build it as a JAR file, and add that file into the plugins
directory. Once you restart Kestra, your custom runner plugin will be available for use (without the need to rebuild the Script plugin).
If you'd like, you can contribute your custom runner plugin to the Kestra community, and we'll be happy to include it on the plugins page.
Moving from development to production is a breeze
Many Kestra users develop their scripts locally in Docker containers and then run the same code in a production environment as Kubernetes pods. Thanks to the taskRunner
property, setting this up is a breeze. Below you see an example showing how you can combine taskDefaults
with the taskRunner
property to use Docker in the development environment (dedicated instance/tenant/namespace) and as Kubernetes pods in production — all without changing anything in the flow or script code.
Development environment:
taskDefaults:
- type: io.kestra.plugin.scripts
values:
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
pullPolicy: IF_NOT_PRESENT # in dev, only pull the image when needed
cpu:
cpus: 1
memory:
memory: 512Mi
Production environment:
taskDefaults:
- type: io.kestra.plugin.scripts
values:
taskRunner:
type: io.kestra.plugin.kubernetes.runner.KubernetesTaskRunner
namespace: default
pullPolicy: ALWAYS # Always pull the latest image in production
config:
username: docker-desktop
masterUrl: https://docker-for-desktop:6443
caCert: xxx
clientCert: xxx
clientKey: xxx
resources:
request: # The resources the container is guaranteed to get
cpu: "500m" # Request 1/2 of a CPU (500 milliCPU)
memory: "256Mi" # Request 256 MB of memory
Note how the containerImage
property is not included in the taskRunner
configuration, but as a generic property available to any script task. This makes the configuration more flexible as usually the image changes more often than the standard runner configuration. For instance, the dbt plugin may need a different image than the generic Python plugin, but the runner configuration can stay the same.
The task runner types
Kestra Core includes the ProcessTaskRunner
by default. Additionally, the DockerTaskRunner
runner is included in the plugin-script
library as Kestra runs all script tasks in Docker containers by default.
Other runners are provided by other plugins such as the AWS, Azure, GCP, and Kubernetes plugins. Each taskRunner
is identified by its type
.
Let's dive into each runner type and see how they work.
ProcessTaskRunner
Here is an example of a Shell script configured with the ProcessTaskRunner
which runs a Shell command as a child process in the Kestra host:
id: process_script_runner
namespace: myteam
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.core.models.script.types.ProcessTaskRunner
commands:
- echo "Hello World!"
The ProcessTaskRunner
doesn’t have any additional configuration beyond the type
property.
DockerTaskRunner
Here is the same example using the DockerTaskRunner
executing the commands in a Docker container:
id: docker_script_runner
namespace: myteam
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
containerImage: centos
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
cpu:
cpus: 1
commands:
- echo "Hello World!"
Once you specify the taskRunner
type, you get the autocompletion and validation for the runner-specific properties. In the example above, the task allocates 1 CPU to the container.
The only property required by the runner is the containerImage
property that needs to be set on the script task. The image can be from a public or private registry. Additionally, using the DockerTaskRunner
you can configure memory allocation, volumes, environment variables, and more. Check the runner's documentation in the built-in Editor for more details.
The DockerTaskRunner
executes the script task as a container in a Docker-compatible engine. This means that you can also use it to run scripts within a Kubernetes cluster with Docker-In-Docker (dind) or in a local Docker engine.
Also, keep in mind that if a Kestra Worker executing the script is terminated (e.g. due to a crash), the container will still run until completion. When running Kestra itself in a container with Docker-In-Docker (dind), both the worker and container will be restarted.
KubernetesTaskRunner
This runner executes the tasks as Kubernetes pods.
- If your script task has
inputFiles
ornamespaceFiles
configured, an init container will be added to upload files into the main container. - If your script task has
outputFiles
configured, a sidecar container will be added to download files from the main container.
All containers will use an in-memory emptyDir
volume for file exchange.
If a task is resubmitted (e.g. due to a retry or a Worker crash), the new Worker will reattach to the already running (or an already finished) pod instead of starting a new one.
Here is a simple example of Shell commands executed in a Kubernetes pod:
id: kubernetes_script_runner
namespace: myteam
description: |
To get the kubeconfig file, run: `kubectl config view --minify --flatten`.
Then, copy the `server`, `caCert`, `clientKey`, `clientCert`, `user`, and `namespace` values to the configuration below.
Here is a mapping of the kubeconfig file to the KubernetesTaskRunner configuration:
- client-key-data: clientKey
- client-certificate-data: clientCert
- certificate-authority-data: caCert
- masterUrl: server
- username: user
inputs:
- id: file
type: FILE
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
inputFiles:
data.txt: "{{ inputs.file }}"
outputFiles:
- out.txt
containerImage: centos
taskRunner:
type: io.kestra.plugin.kubernetes.runner.KubernetesTaskRunner
commands:
- cp data.txt out.txt
And here is an example of a Python script executed in a Kubernetes pod:
id: kubernetes_script_runner
namespace: dev
tasks:
- id: send_data
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.kubernetes.runner.KubernetesTaskRunner
namespace: default
pullPolicy: Always
config:
username: docker-desktop
masterUrl: https://docker-for-desktop:6443
caCert: xxx
clientCert: xxx
clientKey: xxx
resources:
request: # The resources the container is guaranteed to get
cpu: "500m" # Request 1/2 of a CPU (500 milliCPU)
memory: "128Mi" # Request 128 MB of memory
outputFiles:
- "*.json"
script: |
import platform
import socket
import sys
import json
from kestra import Kestra
print("Hello from a Kubernetes runner!")
host = platform.node()
py_version = platform.python_version()
platform = platform.platform()
os_arch = f"{sys.platform}/{platform.machine()}"
def print_environment_info():
print(f"Host's network name: {host}")
print(f"Python version: {py_version}")
print(f"Platform info: {platform}")
print(f"OS/Arch: {os_arch}")
env_info = {
"host": host,
"platform": platform,
"os_arch": os_arch,
"python_version": py_version,
}
Kestra.outputs(env_info)
filename = "environment_info.json"
with open(filename, "w") as json_file:
json.dump(env_info, json_file, indent=4)
print_environment_info()
For a full list of properties available in the KubernetesTaskRunner
, check the Kubernetes plugin documentation or explore the same in the built-in Code Editor in the Kestra UI.
AwsBatchTaskRunner (ECS Fargate & EC2)
This runner will deploy the container for the task to a specified AWS Batch compute environment.
To launch the task on AWS Batch, there are three main concepts you need to be aware of:
- Compute environment — mandatory, not created by the task. There are two types of compute environments you can use:
- ECS Fargate or ECS EC2 (already supported)
- EKS (not supported yet; for now, use
KubernetesTaskRunner
plugin if you want to run your tasks on AWS EKS)
- Job Queue — will be created by the task if not specified. Creating a queue takes some time to set up, so be aware that this adds some latency to the script’s runtime.
- Job — created by the task runner; holds information about which image, commands, and resources to run on. If you are familiar with AWS ECS, it’s the task definition for the containerized application.
To get started quickly, follow this Terraform setup that will provision all resources you need to run containers on ECS Fargate using AWS Batch.
How does it work
In order to support inputFiles
, namespaceFiles
, and outputFiles
, the AwsBatchTaskRunner
currently relies on multi-containers ECS jobs and creates three containers for each job:
- A before-container that uploads input files to S3.
- The main container that fetches input files into the
{{ workingDir }}
directory and runs the task. - An after-container that fetches output files redirected to the
{{ outputDir }}
to make them available from the Kestra UI for download and preview.
Since we don't know the working directory of the container in advance, we always need to explicitly define the working directory and output directory when using the AWS Batch runner, e.g. use cat {{workingDir}}/myFile.txt
rather than cat myFile.txt
.
A full flow example
id: aws_batch_runner
namespace: dev
tasks:
- id: scrape_environment_info
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.aws.runner.AwsBatchTaskRunner
region: eu-central-1
accessKeyId: {{ secrets.awsAccessKey }}
secretKeyId: {{ secrets.awsSecretKey }}
computeEnvironmentArn: "arn:aws:batch:eu-central-1:707969873520:compute-environment/kestraFargateEnvironment"
jobQueueArn: "arn:aws:batch:eu-central-1:707969873520:job-queue/kestraJobQueue"
executionRoleArn: "arn:aws:iam::707969873520:role/kestraEcsTaskExecutionRole"
taskRoleArn: arn:aws:iam::707969873520:role/ecsTaskRole
s3Bucket: kestra-product-de
commands:
- python {{workingDir}}/main.py
namespaceFiles:
enabled: true
outputFiles:
- "environment_info.json"
inputFiles:
main.py: |
import platform
import socket
import sys
import json
from kestra import Kestra
print("Hello from AWS Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
env_info = {
"host": platform.node(),
"platform": platform.platform(),
"OS": sys.platform,
"python_version": platform.python_version(),
}
Kestra.outputs(env_info)
filename = '{{workingDir}}/environment_info.json'
with open(filename, 'w') as json_file:
json.dump(env_info, json_file, indent=4)
print_environment_info()
GcpBatchTaskRunner
This runner will deploy the container for the task to a specified Google Cloud Batch compute environment.
To launch the task on Google Cloud Batch, there are three main concepts you need to be aware of:
- Machine Type — mandatory, the compute machine type on which the Batch will be deployed. If no
reservation
is specified, a new compute instance will be created for each batch which can takes some time. - Reservation — could be used to setup reservation upfront and avoid the time needed to create a new compute instance on each batch.
- Network Interfaces — a batch need a network interface, it will use the
default
interface if not specified otherwise.
How does it work
In order to support inputFiles
, namespaceFiles
, and outputFiles
, the GcpBatchTaskRunner
will mount a volume from a Storage bucket and uploads input files to the bucket before launching the container and downloads output files from the bucket after.
Since we don't know the working directory of the container in advance, we always need to explicitly define the working directory and output directory when using the GCP Batch runner, e.g. use cat {{workingDir}}/myFile.txt
rather than cat myFile.txt
.
A full flow example
id: gcp_batch_runner
namespace: dev
tasks:
- id: scrape_environment_info
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.gcp.runner.GcpBatchTaskRunner
projectId: {{ secrets.projectId }}
region: europe-west9
commands:
- python {{workingDir}}/main.py
namespaceFiles:
enabled: true
outputFiles:
- "environment_info.json"
inputFiles:
main.py: |
import platform
import socket
import sys
import json
from kestra import Kestra
print("Hello from GCP Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
env_info = {
"host": platform.node(),
"platform": platform.platform(),
"OS": sys.platform,
"python_version": platform.python_version(),
}
Kestra.outputs(env_info)
filename = '{{workingDir}}/environment_info.json'
with open(filename, 'w') as json_file:
json.dump(env_info, json_file, indent=4)
print_environment_info()
AzureBatchTaskRunner
This runner will deploy the container for the task to a specified Azure Batch pool.
To launch the task on Azure Batch, there is only two main concepts you need to be aware of:
- Pool — mandatory, not created by the task. This is a pool composed of nodes where your task can run on.
- Job — created by the task runner; holds information about which image, commands, and resources to run on.
How does it work
In order to support inputFiles
, namespaceFiles
, and outputFiles
, the AzureBatchTaskRunner
currently relies on resource files and output files which transits through Azure Blob Storage.
As with ProcessTaskRunner and DockerTaskRunner, you can directly reference input, namespace and output files since you're in the same working directory: cat myFile.txt
.
A full flow example
id: azure_batch_runner
namespace: dev
tasks:
- id: scrape_environment_info
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.azure.runner.AzureBatchTaskRunner
account: "{{secrets.account}}"
accessKey: "{{secrets.accessKey}}"
endpoint: "{{secrets.endpoint}}"
poolId: "{{vars.poolId}}"
blobStorage:
containerName: "{{vars.containerName}}"
connectionString: "{{secrets.connectionString}}"
commands:
- python main.py
namespaceFiles:
enabled: true
outputFiles:
- "environment_info.json"
inputFiles:
main.py: |
import platform
import socket
import sys
import json
from kestra import Kestra
print("Hello from Azure Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
env_info = {
"host": platform.node(),
"platform": platform.platform(),
"OS": sys.platform,
"python_version": platform.python_version(),
}
Kestra.outputs(env_info)
filename = 'environment_info.json'
with open(filename, 'w') as json_file:
json.dump(env_info, json_file, indent=4)
print_environment_info()
Next steps
The taskRunner
property is backward compatible with the PROCESS and DOCKER runner
property (except for the containerImage
, which replaces the nested docker.image
property). Flows using the runner
property will continue working as expected. For better flexibility and future-proofing your flows, consider giving the taskRunner
a try.