Python, R, Node.js and Shell Scripts
The previous sections in the developer guide covered Flows and Tasks. This section covers script tasks.
Script tasks allow you to orchestrate custom business logic written in Python
, R
, Node.js
, Powershell
and Shell
scripts. By default, all these tasks run in individual Docker containers. You can optionally run those scripts in a local process or run them on a remote worker group.
The Commands
and Script
tasks
For each of these languages, Kestra provides two types of tasks: inline Script
tasks, and Commands
tasks that allow executing a list of commands by a script interpreter.
- The
Script
task is useful for scripts that can be written inline in a YAML flow configuration. They are best suited for simple automation tasks, e.g., if you want to run a single Python function fetching data from an API. It can also be used to run a single script based on an output of the previous task using data retrieved from the internal storage. - The
Commands
tasks can be combined with aio.kestra.core.tasks.flows.WorkingDirectory
andio.kestra.plugin.git.Clone
tasks to run custom scripts from Git in Docker containers without the overhead of packaging every script into a separate container image. This is possible thanks to the combination of a baseimage
and custombeforeCommands
.
The following table gives an overview of all script tasks and their configuration.
Language | Default image | beforeCommands example | Script example | Commands example |
---|---|---|---|---|
Python | python | pip install requests kestra | print("Hello World!") | python hello.py |
R | r-base | Rscript -e "install.packages('dplyr')" | print("Hello World!") | Rscript hello.R |
Node.js | node | npm install json2csv | console.log('Hello World!'); | node hello.js |
Shell | ubuntu | apt-get install curl | echo "Hello World!" | ./hello.bash |
Powershell | powershell | Install-Module -Name ImportExcel | Write-Output "Hello World!" | .\hello.ps1 |
Full class names:
- io.kestra.plugin.scripts.python.Commands
- io.kestra.plugin.scripts.python.Script
- io.kestra.plugin.scripts.r.Commands
- io.kestra.plugin.scripts.r.Script
- io.kestra.plugin.scripts.node.Commands
- io.kestra.plugin.scripts.node.Script
- io.kestra.plugin.scripts.shell.Commands
- io.kestra.plugin.scripts.shell.Script
- io.kestra.plugin.scripts.powershell.Commands
- io.kestra.plugin.scripts.powershell.Script
Each of those languages have a dedicated tag in the Blueprints catalog. Check available blueprints to get started with those tasks. The section below explains the recommended usage patterns and common examples.
Bind-mount a local script: the simplest pattern for local development with Docker
The easiest way to run a script stored locally is to use a Docker volume.
First, make sure that your Kestra configuration in the Docker Compose file allows volume mounting. Here is how you can configure it:
kestra:
image: kestra/kestra:latest-full
pull_policy: always
entrypoint: /bin/bash
user: "root"
env_file:
- .env
command:
- -c
- /app/kestra server standalone --worker-thread=128
volumes:
- kestra-data:/app/storage
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/kestra-wd:/tmp/kestra-wd:rw
environment:
KESTRA_CONFIGURATION: |
datasources:
postgres:
url: jdbc:postgresql://postgres:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
kestra:
server:
basic-auth:
enabled: false
username: admin
password: kestra
repository:
type: postgres
storage:
type: local
local:
base-path: "/app/storage"
queue:
type: postgres
tasks:
tmp-dir:
path: /tmp/kestra-wd/tmp
scripts:
docker:
volume-enabled: true # 👈 this is the relevant setting
With that setting, you can point the script task to any script on your local file system:
id: pythonVolume
namespace: dev
tasks:
- id: anyPythonScript
type: io.kestra.plugin.scripts.python.Commands
runner: DOCKER
docker:
image: ghcr.io/kestra-io/pydata:latest
volumes:
- /Users/anna/gh/KESTRA_REPOS/scripts:/app
commands:
- python /app/etl/parametrized.py
This flow points the Python task running in a Docker container to this script.
Recommended usage pattern: Scripts from Git in Docker
Below is a simple example of the recommended usage pattern:
- Start with a
io.kestra.core.tasks.flows.WorkingDirectory
task. This allows running multiple scripts in the same working directory, helping to output results and pass data between tasks when needed. - Use
io.kestra.plugin.git.Clone
as the first child task of aio.kestra.core.tasks.flows.WorkingDirectory
task to ensure that your repository is cloned into an empty directory. You can provide a URL and authentication credentials to any Git-based system including GitHub, GitLab, BitBucket, CodeCommit, and more. - Use one of the
Commands
task (e.g.python.Commands
) to specify which scripts or modules to run, while explicitly specifying the path within the Git repository. For instance, the commandpython scripts/etl_script.py
ensures you run a Python scriptetl_script.py
located in thescripts
directory of the examples repository in themain
branch.- This repository is public, so the repository's
url
and abranch
name is all that's required. - If this repository were private, we would also had to specify
kestra-io
as theusername
and add a Personal Access Token as thepassword
property.
- This repository is public, so the repository's
- Optionally, specify your base Docker image using the
image
argument of thedocker
property. If you don't specify the image property, the default image is used, as shown in the table above. The referenced container image should include commonly used dependencies, for examplerequests
,pandas
,sqlalchemy
,scikit-learn
, etc. While you can have a dedicated image for each script, Kestra supports a much simpler Python dependency management pattern using a base image with shared dependencies and installing any custom dependencies needed for a particular script at runtime using thebeforeCommands
property. - The
beforeCommands
property is a list of arbitrary commands that can be optionally executed within the container before starting the scriptcommands
. If your cloned Git repository includesrequirements.txt
file, you can usepip install -r requirements.txt
to install the requiredpip
packages.- The additional
warningOnStdErr
boolean flag can be set tofalse
to avoid setting the task to a WARNING state when the installation process emits warnings to the standard error. By default, warnings during e.g.pip
package installation will set the task state to aWARNING
state to give you more transparency about the entire process. - Redirecting the installation output to
/dev/null
in the commandpip install faker > /dev/null
ensures that the (usually verbose) output emitted during installation is not captured within your task logs.
- The additional
- The
commands
is a list of commands that will be executed sequentially. This setup provides a high degree of flexibility as it allows you to execute parametrized scripts, custom CLI applications, etc. - The
LocalFiles
task can be used to output a file generated by a script. Here, the generate_order.py script creates a fileorders.csv
. When you reference this file in theoutputs
property, it will be available for download from the Outputs tab on the Executions page. You'll be able to retrieve that file in downstream tasks using the syntax{{outputs.taskId.uris['outputFileName.fileExtension']}}
, which in this example would be{{outputs.outputCsv.uris['orders.csv']}}
. In this specific flow, we use that file in a downstream task that loads that file to an S3 bucket.
id: pythonCommandsExample
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: gitPythonScripts
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/pydata:latest
beforeCommands:
- pip install faker > /dev/null
commands:
- python scripts/etl_script.py
- python scripts/generate_orders.py
- id: outputFile
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- orders.csv
- id: loadCsvToS3
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{secret('AWS_SECRET_ACCESS_KEY')}}"
region: eu-central-1
bucket: kestraio
key: stage/orders.csv
from: "{{outputs.outputFile.uris['orders.csv']}}"
Why Git? Why not just use a locally stored file?
We believe that using Git already during local development of your data pipelines allows a more robust engineering workflow and makes it easier to move from local development to production. Before using any script in Kestra, you should first thoroughly test that script in your favorite IDE and only use Kestra to orchestrate it. For local development, we recommend using a feature branch or a dev
/develop
branch, rather than mounting files to Docker containers. However, if you don't want to use Git, the section below demonstrates the usage of inline Script
tasks.
Simple usage pattern: Inline Scripts in Docker
In contrast to the Commands
tasks, the Script
task doesn't require using Git to store your script code. Instead, you define it inline in your YAML workflow definition along with any other configuration.
Here is a simple example illustrating how to use that pattern:
- Even though it's not required, it's best to start with a
io.kestra.core.tasks.flows.WorkingDirectory
task. This task allows you to configure additional input or output files using theLocalFiles
task. The example below uses a SQL file as input. - Use one of the
Script
tasks (e.g.python.Script
) and paste your Python/R/Node.js/Shell script as a multiline string into thescript
property. Here, the script reads the SQL file, makes an API call and stores the outputs. - You can optionally overwrite the base Docker image using the
image
argument of thedocker
property. This works exactly the same way as in theCommands
tasks. - You can also optionally use the
beforeCommands
property to install libraries used in your inline script. Here, we use the commandpip install requests kestra
to install two newpip
packages not available in the base imagepython:3.11-slim
. - The
LocalFiles
task can be used to output a file generated as a result of a script. This script outputs a JSON file and a downstream task loads that file as a document to a MongoDB collection.
id: apiJSONtoMongoDB
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: demoSQL
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
query.sql: |
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM sales;
- id: inlineScript
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests kestra > /dev/null
warningOnStdErr: false
script: |
import requests
import json
from kestra import Kestra
with open('query.sql', 'r') as input_file:
sql = input_file.read()
response = requests.get("https://api.github.com")
data = response.json()
with open("output.json", "w") as output_file:
json.dump(data, output_file)
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
- id: jsonFiles
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- output.json
- id: loadToMongoDB
type: io.kestra.plugin.mongodb.Load
connection:
uri: mongodb://host.docker.internal:27017/
database: local
collection: github
from: "{{outputs.jsonFiles.uris['output.json']}}"
When to use Script
over Commands
?
The Script
pattern has several advantages:
- Simplicity: you don't need to commit files to a Git repository. The script code is stored and versioned using Kestra's revision history along with your orchestration code.
- Easier integration with the templating engine: when the entire workflow is defined in one configuration file, it can be easier to see dependencies and use templating to share variable definitions, pass outputs to downstream tasks, etc.
However, the Script
tasks also have some disadvantages as compared to the Commands
tasks:
- Readability: adding programming language code into a configuration file such as YAML makes that code more difficult to read, test and review. It lacks the syntax highlighting and testability you would otherwise get from your IDE when writing, for example, Python script in a
.py
file rather than pasting that code inline into a.yml
config. - Support for complex use cases: the
Script
task is not meant for complex use cases where your business logic is comprised of multiple files importing classes and functions from other modules, etc.
Overall, we recommend using io.kestra.plugin.git.Clone
and Commands
tasks for advanced production workloads. However, the Script
tasks offer a great alternative for simple use cases.
Runners
You can configure your scripts to run either in local processes or in Docker containers by using the runner
property:
- By default all scripting tasks run in isolated containers using the
DOCKER
runner. - Setting the
runner
property toPROCESS
will execute your task in a local process on the Worker without relying on Docker for container isolation.
runner: DOCKER
This is the default option for all script tasks. There are many arguments that can be provided here, including credentials to private Docker registries:
id: pythonInContainer
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: gitPythonScripts
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
commands:
- python scripts/etl_script.py
runner: DOCKER
docker:
image: annageller/kestra:latest
config: |
{
"auths": {
"https://index.docker.io/v1/": {
"username": "annageller",
"password": "{{ secret('DOCKER_PAT') }}"
}
}
}
- id: output
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- "*.csv"
- "*.parquet"
Head over to the Secrets section to learn more about secrets management in Kestra.
runner: PROCESS
The PROCESS
runner is useful if your Kestra instance is running locally without Docker and you want to access your local files and environments, for example, to take advantage of locally configured Conda virtual environments.
id: localPythonScript
namespace: dev
tasks:
- id: hello
type: io.kestra.plugin.scripts.python.Commands
runner: PROCESS
beforeCommands:
- conda activate myCondaEnv
commands:
- python /Users/you/scripts/etl_script.py
Running scripts in a local process is particularly beneficial when using remote Worker Groups. The example below ensures that a script will be picked up only by Kestra workers that have been started with the key gpu
, effectively delegating processing of scripts that require demanding computational requirements to the right server, rather than running them directly in a local container:
id: gpuTask
namespace: dev
tasks:
- id: hello
type: io.kestra.plugin.scripts.python.Commands
runner: PROCESS
commands:
- python ml_on_gpu.py
workerGroup:
key: gpu
Building a custom Docker image
You can bake all dependencies needed for your script tasks directly into the Kestra's base image. Here is an example for Python tasks:
FROM kestra/kestra:latest-full
USER root
RUN apt-get update -y && apt-get install pip -y
RUN pip install --no-cache-dir pandas requests kestra boto3
Build the image:
docker build -t kestra-python:latest .
Then, use that image in your docker-compose.yml
file, as shown in the Getting started guide.
kestra:
image: kestra-python:latest
pull_policy: never
Once you start Kestra containers using docker compose up -d
, you can create a flow that directly runs Python tasks with your custom dependencies using the PROCESS
runner:
id: python-process
namespace: dev
tasks:
- id: custom-dependencies
type: io.kestra.plugin.scripts.python.Script
runner: PROCESS
script: |
import pandas as pd
import requests
See the Docker Deployments guide for more details on managing custom Kestra container images.
Installing dependencies using beforeCommands
There are several ways of installing custom packages for your workflows. While you could bake all your package dependencies into a custom container image, often it's convenient to install a couple of additional packages at runtime without having to build separate images. The beforeCommands
can be used for that purpose.
pip install package
Here is a simple example installing pip
packages requests
and kestra
before starting the script:
id: pip
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: inlineScript
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests kestra > /dev/null
warningOnStdErr: false
script: |
import requests
import kestra
print(requests.__version__)
print([i for i in dir(kestra.Kestra) if not i.startswith("_")])
pip install -r requirements.txt
This example clones a Git repository that contains a requirements.txt
file. The script task uses beforeCommands
to install those packages. We then list recently installed packages to validate that this process works as expected:
id: pythonRequirementsFile
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: printRequirements
type: io.kestra.plugin.scripts.shell.Commands
runner: PROCESS
commands:
- cat requirements.txt
- id: listRecentlyInstalledPackages
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
docker:
image: python:3.11-slim
beforeCommands:
- pip install -r requirements.txt > /dev/null
commands:
- ls -lt $(python -c "import site; print(site.getsitepackages()[0])") | head -n 20
Using Kestra's prebuilt images
Many data engineering use cases require performing fairly standardized tasks such as:
- processing data with
pandas
- transforming data with
dbt-core
(using a dbt adapter for your data warehouse) - making API calls with the
requests
library, etc.
To solve those common challenges, the kestra-io/examples repository provides several public Docker images with the latest versions of those common packages. Many Blueprints use those public images by default. The images are hosted in GitHub Container Registry and follow the naming ghcr.io/kestra-io/packageName:latest
.
Example: running R script in Docker
Here is a simple example using the ghcr.io/kestra-io/rdata:latest
Docker image provided by Kestra to analyze the built-in mtcars
dataset using dplyr
and arrow
R libraries:
id: rCars
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: r
type: io.kestra.plugin.scripts.r.Script
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/rdata:latest
script: |
library(dplyr)
library(arrow)
data(mtcars) # Load mtcars data
print(head(mtcars))
final <- mtcars %>%
summarise(
avg_mpg = mean(mpg),
avg_disp = mean(disp),
avg_hp = mean(hp),
avg_drat = mean(drat),
avg_wt = mean(wt),
avg_qsec = mean(qsec),
avg_vs = mean(vs),
avg_am = mean(am),
avg_gear = mean(gear),
avg_carb = mean(carb)
)
final %>% print()
write.csv(final, "final.csv")
mtcars_clean <- na.omit(mtcars) # remove rows with NA values
write_parquet(mtcars_clean, "mtcars_clean.parquet")
- id: outputs
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- "*.csv"
- "*.parquet"
R libraries are time-consuming to install. From a technical standpoint, you could install custom R packages at runtime as follows:
id: rCars
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: r
type: io.kestra.plugin.scripts.r.Script
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/rdata:latest
beforeCommands:
- Rscript -e "install.packages(c('dplyr', 'arrow'))" > /dev/null 2>&1
However, such installation may take up to 30 minutes, depending on the packages you install.
Prebuilt images such as ghcr.io/kestra-io/rdata:latest
can help you iterate much faster. Before moving to production, you can build your custom images with the exact package versions that you need.
Support for additional languages such as Rust via shell.Commands
and Docker runner
The Commands
scripts allow you to run arbitrary commands based on a Docker image. This means that you can use other languages (such as Rust) as long as:
- Their dependencies can be packaged into a Docker image
- Their execution can be triggered from a
Shell
command line.
Here is an example flow using a Rust image based on a sample ETL project:
id: rustFlow
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: rust
type: io.kestra.plugin.scripts.shell.Commands
commands:
- etl
docker:
image: ghcr.io/kestra-io/rust:latest
- id: downloadFiles
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- "*.csv"
Once the container finishes execution, you'll be able to download all CSV files generated by the Rust container from the Outputs tab.
The ghcr.io/kestra-io/rust:latest
image is public, so you can directly use the example shown above and give it a try.
The Git
plugin
To use the io.kestra.plugin.git.Clone
task in your flow, make sure to add it as the first child task of the WorkingDirectory
task. Otherwise, you’ll get an error: Destination path "xyz" already exists and is not an empty directory
. This happens because you can only clone a GitHub repository into an empty working directory.
Add io.kestra.plugin.git.Clone
as the first task in a WorkingDirectory
Adding the io.kestra.plugin.git.Clone
task directly as the first child task of the WorkingDirectory
task will ensure that you clone your repository into an empty directory before any other task would generate any output artifacts.
Private Git repositories
Typically, you would want to use io.kestra.plugin.git.Clone
with a private GitHub repository. Make sure to:
- Add your organization/user name as
username
- Generate your access token and provide it on the
password
property.
Below you can find links to instructions on how to generate an access token for the relevant Git platform:
When to use the WorkingDirectory
task
By default, all Kestra tasks are stateless. If one task generates files, those files won’t be available in downstream tasks unless they are persisted in internal storage. Upon each task completion, the temporary directory for the task is purged. This behavior is generally useful as it keeps your environment clean and dependency free, and it avoids potential privacy or security issues when exposing some data generated by a task to other processes.
Despite the benefits of the stateless execution, in certain scenarios, statefulness is desirable. Imagine that you want to execute several Python scripts, and each of them generates some output data. Another script combines that data as part of an ETL/ML process. Executing those related tasks in the same working directory and sharing state between them is helpful for the following reasons:
- You can clone your entire GitHub branch with multiple modules and configuration files needed to run several scripts
- You can execute multiple scripts sequentially on the same worker or in the same container, minimizing latency,
- Output artifacts of each task are directly available to other tasks without having to persist them within the internal storage.
The WorkingDirectory
task addresses all of these challenges. It allows you to:
- Clone a Git repository (or retrieve needed input files in other ways),
- Run multiple tasks sequentially in the same working directory
- Reuse the same file system's working directory across multiple tasks.
All tasks within the WorkingDirectory
can use output artifacts from previous tasks without having to use the {{outputs.taskId.outputAttribute}}
syntax.
Generating outputs from a script task using {{outputDir}}
If you want to generate files in your script to make them available for download and use in downstream tasks, you can leverage the outputDir
variable. Files stored in that directory will be persisted in Kestra's internal storage. Here is an example:
id: outputTest
namespace: dev
tasks:
- id: myPython
type: io.kestra.plugin.scripts.python.Script
runner: PROCESS
script: |
f = open("{{outputDir}}/myfile.txt", "a")
f.write("Hi, this is output from a script 👋")
f.close()
- id: myShell
type: io.kestra.plugin.scripts.shell.Commands
runner: PROCESS
commands:
- cat {{outputs.myPython.outputFiles['myfile.txt']}}
The first task creates a file 'myfile.txt'
and the next task can access it by leveraging the syntax {{outputs.yourTaskId.outputFiles['yourFileName.fileExtension']}}
.
Check the Outputs page for more details about managing outputs.
The LocalFiles
task to manage inputs
and outputs
for your script code
The LocalFiles
task is meant to be used inside of the WorkingDirectory
task.
It allows you to output files generated in a script and add new files inline within the YAML workflow definition. The examples from the previous sections demonstrate how you can use inputs and outputs. The subsections below provide additional explanation and further examples.
Managing input files: LocalFiles.inputs
to configure requirements.txt
The inputs
property can be used to add input files that might be needed in your script. Imagine that you want to add a custom requirements.txt
file that contains exact pip package versions, as in this example:
id: pip
namespace: dev
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: pip
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
requirements.txt: |
kestra>=0.6.0
pandas>=1.3.5
requests>=2.31.0
- id: pythonScript
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:3.11-slim
beforeCommands:
- pip install -r requirements.txt > /dev/null
warningOnStdErr: false
script: |
import requests
import kestra
import pandas as pd
print(f"requests version: {requests.__version__}")
print(f"pandas version: {pd.__version__}")
methods = [i for i in dir(kestra.Kestra) if not i.startswith("_")]
print(f"Kestra methods: {methods}")
Adding such additional configuration files is possible by using the inputs
property of the LocalFiles
task. You provide:
- the file name as the key
- the file's content as value.
In the example shown above, the key is requirements.txt
and the value is the typical content of a requirements
file listing pip
packages and their version constraints.
Using input files to pass data from a trigger to a script task
Another use case for input files is when your custom scripts need input coming from other tasks or triggers.
Consider the following example flow that runs when a new object with the prefix "raw/"
arrives in the S3 bucket "declarative-orchestration"
:
id: s3TriggerCommands
namespace: blueprint
description: process CSV file from S3 trigger
tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: cloneRepo
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: local
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data.csv: "{{ trigger.objects | jq('.[].uri') | first }}"
- id: python
type: io.kestra.plugin.scripts.python.Commands
description: this script reads a file `data.csv` from S3 trigger
docker:
image: ghcr.io/kestra-io/pydata:latest
warningOnStdErr: false
commands:
- python scripts/clean_messy_dataset.py
- id: output
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- "*.csv"
- "*.parquet"
triggers:
- id: waitForS3object
type: io.kestra.plugin.aws.s3.Trigger
bucket: declarative-orchestration
maxKeys: 1
interval: PT1S
filter: FILES
action: MOVE
prefix: raw/
moveTo:
key: archive/raw/
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
Because the LocalFiles
is a dedicated task independent of the Python task, it can be used to pass the S3 object key (downloaded to Kestra's internal storage by the S3 trigger) as a local file to any downstream task. Note that we didn't have to hardcode anything specific to Kestra in the Python script from GitHub. That script remains pure Python that you can run anywhere. Kestra's trigger logic is stored along with orchestration and infrastructure configuration in the YAML flow definition.
This separation of concerns (i.e. not mixing orchestration and business logic) makes your code easier to test and keeps your business logic vendor-agnostic.
Managing output files: LocalFiles.outputs
Using the previous example, note how the LocalFiles
can be used to output any file as downloadable artifacts. This script from GitHub outputs a file named clean_dataset.csv
. However, you don't have to hardcode that specific file name. Instead, a simple Glob expression can be used to define that you want to output any CSV or Parquet file generated by that script:
- id: output
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- "*.csv"
- "*.parquet"
If any files matching that Glob patterns are generated during the flow's Execution, they will be available for download from the Outputs tab.
How to emit outputs
and metrics
from script tasks
The LocalFiles.outputs
is useful to send files generated in a script to Kestra's internal storage so that these files can be used in downstream tasks or exposed as downloadable artifacts. However, outputs
can also be simple key-value pairs that contain metadata generated in your scripts.
Many tasks from Kestra plugins emit certain outputs by default. You can inspect which outputs are generated by each task or trigger from the respective plugin documentation. For instance, follow this plugin documentation link to see the outputs generated by the HTTP Download task. Once the flow is executed, the Outputs tab will list the output metadata as key-value pairs. Run the example below to see it in action:
id: download
namespace: dev
tasks:
- id: http
type: io.kestra.plugin.fs.http.Download
uri: https://raw.githubusercontent.com/kestra-io/examples/main/datasets/orders.csv
This example automatically emits output metadata, such as the status code
, file uri
and request headers
because those properties have been preconfigured on that plugin's task. However, in your custom script, you can decide what metadata you want to send to Kestra to make that metadata visible in the UI.
Outputs and metrics in Script and Commands tasks
The Scripts Plugin provides convenient methods to send outputs and metrics to the Kestra backend during flow Execution. Under the hood, Kestra tracks outputs and metrics from script tasks by searching standard output and standard error for ::{}::
or {}
patterns that allow you to specify outputs and metrics using a JSON request payload:
{}
for single-line JSON objects::{}::
for multi-line JSON objects.
Note that
outputs
require a dictionary, whilemetrics
expect a list of dictionaries.
Below is an example showing outputs
with key-value pairs:
"outputs": {
"key": "value",
"exampleList": [1, 2, 3],
"tags": {
"s3Bucket": "declarative-orchestration",
"region": "us-east-1"
}
}
Here is the representation of a metrics
object. It's a list of dictionaries:
"metrics": [
{
"name": "myMetric", // mandatory, the name of the metrics
"type": "counter", // mandatory, "counter" or "timer" metric type
"value": 42, // mandatory, Double or Integer value
"tags": { // optional list of tags
"readOnly": true,
"location": "US"
}
}
]
Both, outputs and metrics can optionally include a list of tags that expose internal details.
Metric types: counter
and timer
There are two metric types:
counter
, expressed in Integer or Double data type, measures a countable number of rows/bytes/objects processed in a given tasktimer
, expressed in Double data type, measures the number ofseconds
to process specific computation in your flow.
Below you can find examples of outputs
and metrics
definition for each language.
Python
The example below shows how you can add simple key-value pairs in your Python script to send custom metrics and outputs to Kestra's backend at runtime:
from kestra import Kestra
Kestra.outputs({'data': data, 'nr': 42})
Kestra.counter('nr_rows', len(df), tags={'file': filename})
Kestra.timer('ingestion_duration', duration, tags={'file': filename})
The Kestra.outputs({"key": "value"})
takes a dictionary of key-value pairs, while the metrics such as Counter and Timer take the metric name, metric value and a dictionary of tags as positional arguments, for example:
Kestra.counter("countable_int_metric_name", 42, tags={"key": "value"})
Kestra.timer("countable_double_metric_name", 42.42, tags={"key": "value"})
Here is a more comprehensive example in a flow:
id: outputsMetricsPython
namespace: dev
inputs:
- name: attempts
type: INT
defaults: 10
tasks:
- id: py
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/pydata:latest
script: |
import timeit
from kestra import Kestra
attempts = {{inputs.attempts}}
modules = ['pandas', 'requests', 'kestra', 'faker', 'csv', 'random']
results = {}
for module in modules:
time_taken = timeit.timeit(f'import {module}', number=attempts)
results[module] = time_taken
Kestra.timer(module, time_taken, tags=dict(nr_attempts=attempts))
Kestra.outputs(results)
Node.js
Node.js follows the same syntax for sending outputs and metrics as in Python. Here is an example:
const Kestra = require("./kestra");
Kestra.outputs({data: 'data', nr: 42, mybool: true, myfloat: 3.65});
Kestra.counter('metric_name', 100, {partition: 'file1'});
Kestra.timer('timer1', (callback) => {setTimeout(callback, 1000)}, {tag1: 'hi'});
Kestra.timer('timer2', 2.12, {tag1: 'from', tag2: 'kestra'});
Shell
To send outputs and metrics from a Shell task, wrap a JSON payload (i.e. a map/dictionary) with double colons '::{"outputs": {"key":"value"}}::'
or '::{"metrics": [{"name":"count","type":"counter","value":1,"tags":{"key":"value"}::'
as shown in the following examples:
# 1. send outputs with different data types
echo '::{"outputs":{"test":"value","int":2,"bool":true,"float":3.65}}::'
# 2. send a counter with tags
echo '::{"metrics":[{"name":"count","type":"counter","value":1,"tags":{"tag1":"i","tag2":"win"}}]}::'
# 3. send a timer with tags
echo '::{"metrics":[{"name":"time","type":"timer","value":2.12,"tags":{"tag1":"i","tag2":"destroy"}}]}::'
The JSON payload should be provided without any spaces.
When to use metrics and when to use outputs?
If you want to track task-run metadata across multiple executions of a flow, and this metadata is of an arbitrary data type (it might be a string, a list of dictionaries, or even a file), use outputs
rather than metrics
. Metrics can only be used with numerical values.
Use cases for outputs
: results of a task of any data type
Outputs are task-run artifacts. They are generated as a result of a given task. Outputs can be used for two reasons:
- To pass data between tasks
- To generate result artifacts for observability and auditability e.g. to track specific metadata or to share downloadable file artifacts with business stakeholders.
Using outputs to pass data between tasks
Outputs can be used to pass data between tasks. One task can generate some outputs and other task can use that value:
id: outputsInputs
namespace: dev
tasks:
- id: passOutput
type: io.kestra.core.tasks.debugs.Return
format: "hello world!"
- id: takeInput
type: io.kestra.core.tasks.debugs.Return
format: "data from previous task - {{outputs.passOutput.value}}"
Use cases for metrics
: numerical values that can be aggregated and visualized across Executions
Metrics are intended to track custom numeric (metric type: counter
) or duration (metric type: timer
) attributes that you can visualize across flow executions, such as number of rows or bytes processed in a task. Metrics are expressed as numerical values of integer
or double
data type.
Examples of metadata you may want to track as metrics
:
- the number of rows processed in a given task (e.g. during data ingestion or transformation),
- the accuracy score of a trained ML model in order to compare this result across multiple workflow runs (e.g. you can see the average or max value across multiple executions),
- other pieces of metadata that you can track across executions of a flow (e.g. a duration of a certain function execution within a Python ETL script).