​Outputs & ​Metrics from a script engine

Kestra can track outputs and metrics from any scripting language. The core Python and Node plugins provide methods to configure outputs and metrics.

If you use another language such as Bash, you can leverage the echo command on the stdout.

Script command

Kestra inspects every output on standard output and standard error and looks for the special patterns ::{}:: or {} that allow to specify outputs and metrics as JSON objects:

  • {} can be used for single-line JSON objects.
  • ::{}:: can be used for multi-line JSON objects.

Below is an example of an output object. It's a dictionary object that you can use to add any key-value pairs:

json
"outputs": { // map/dicttionary of key-value pairs with outputs
    "my-key": "my-value",
    "my-list": [1, 2, 3] // you can use  a variety of data types supported in a JSON request payload
}

Here is the representation of a metrics object. It's a list of dictionaries:

json
"metrics": [ // you can send multiple metrics at once - each is a dictionary within a list
    {
        "name": "my-counter", // mandatory, the name of the metrics
        "type": "counter", // mandatory, "counter" or "timer" metric type
        "value": 1.2, // mandatory (double), counter to measure number of objects/rows/items processed in a given task, or duration in seconds for the timer metric
        "tags": { // optional list of tags that will expose internal details
          "type": "read",
          "location": "EU"
        }
    }
]

When to use metrics and when to use outputs?

Use cases for outputs

Outputs are task-run artifacts. They are generated as a result of a given task. If you want to track task-run metadata across multiple executions of a flow, and this metadata is of an arbitrary data type (it might be a string, integer, ...), use outputs rather than metrics.

Examples of metadata you may want to track as outputs:

  • the number of rows processed in a given task that you want to use in subsequent tasks validating the number of rows,
  • the accuracy score of a trained ML model in order to compare this result (output artifact) across multiple workflow executions,
  • other pieces of metadata you want to track across executions of a flow (e.g. a dataset name used within a Python ETL script).

Outputs can be used to pass data between tasks. One task can generate some outputs and other task can use that value:

yaml
id: outputsInputs
namespace: dev
tasks:
    - id: passOutput
      type: io.kestra.core.tasks.debugs.Return
      format: "hello world!"
    - id: takeInput
      type: io.kestra.core.tasks.debugs.Return
      format: "data from previous task - {{outputs.passOutput.value}}"

Use cases for metrics

Metrics are intended to track custom numeric (metric type: counter) or duration (metric type: timer) attributes that you may want to visualize across task runs and flow executions. Metrics are typically expressed as double data type numerical values.

Say, you are using the EachParallel task to process data across multiple partitions in parallel. You then want to visualize how many rows were processed in each partition and how long this process took.

Here is a flow demonstrating how you can accomplish that:

yaml
id: partitions
namespace: dev
description: Process partitions in parallel

tasks:
  - id: getPartitions
    type: io.kestra.core.tasks.scripts.Python
    inputFiles:
      main.py: |
        from kestra import Kestra
        partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
        Kestra.outputs({'partitions': partitions})

  - id: processPartitions
    type: io.kestra.core.tasks.flows.EachParallel
    value: '{{outputs.getPartitions.vars.partitions}}'
    tasks:
      - id: partition
        type: io.kestra.core.tasks.scripts.Python
        inputFiles:
          main.py: |
            import random
            import time
            from kestra import Kestra
            
            filename = '{{ taskrun.value }}'
            print(f"Reading and processing partition {filename}")
            nr_rows = random.randint(1, 1000)
            processing_time = random.randint(1, 20)
            time.sleep(processing_time)
            Kestra.counter('nr_rows', nr_rows, {'partition': filename})
            Kestra.timer('processing_time', processing_time, {'partition': filename})

The above flow uses both metrics types: counter and timer, and the the partition name is used as a tag.

Run the above flow example and inspect both the Metrics and Outputs tabs on the Execution page of that flow to see the difference between outputs (in this example, the output is named: partitions) and metrics (here: the nr_rows and processing_time).

Examples

shell
# 1. send some outputs with different types
echo '::{"outputs":{"test":"value","int":2,"bool":true,"float":3.65}}::'

# 2. send a counter with tags
echo '::{"metrics":[{"name":"count","type":"counter","value":1,"tags":{"tag1":"i","tag2":"win"}}]}::'

# 3. send a timer with tags
echo '::{"metrics":[{"name":"time","type":"timer","value":2.12,"tags":{"tag1":"i","tag2":"destroy"}}]}::'