Managing and purging flow outputs
Best practices for handling flow outputs incl. purging large outputs and conditionally returning outputs.
Conditionally returning outputs
When you have a flow that can return different outputs based on some condition, you can use a Pebble expression in the outputs
section to conditionally return the output of task A
if it wasn't skipped, and the output of task B
otherwise.
id: conditionallyReturnOutputs
namespace: company.team
inputs:
- id: runTask
type: BOOLEAN
defaults: true
tasks:
- id: taskA
runIf: "{{ inputs.runTask }}"
type: io.kestra.plugin.core.debug.Return
format: Hello World!
- id: taskB
type: io.kestra.plugin.core.debug.Return
format: Fallback output
outputs:
- id: flowOutput
type: STRING
value: "{{ tasks.taskA.state != 'SKIPPED' ? outputs.taskA.value : outputs.taskB.value }}"
Purging large outputs
When you have a flow that generates large outputs that don't need to be kept after the execution is completed, you can use the io.kestra.plugin.core.storage.PurgeExecutionFiles
task to purge the outputs of the current execution.
In the example below, the flow downloads a large file from an HTTP API and uploads it to an S3 bucket. After the file is uploaded to S3, it's no longer needed, so the PurgeExecutionFiles
task is used to delete the file from the internal storage.
id: extractLoadPurge
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: load
type: io.kestra.plugin.aws.s3.Upload
from: "{{ outputs.extractLargeFile.uri }}"
bucket: myBucket
key: largeFiles/orders.csv
- id: purge
type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
Was this page helpful?