Data storage and processing
Kestra's primary purpose is to orchestrate data processing via tasks, so data is central to each flow's execution.
Depending on the task, data can be stored inside the execution context or inside Kestra's internal storage. You can also manually store data inside Kestra's state store by using dedicated tasks.
Some tasks give you the choice of where you want to store the data, usually using a fetchType
property or the three fetch
/fetchOne
/store
properties.
For example, using the DynamoDB Query task:
id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: FETCH
The fetchType
property can have four values:
FETCH_ONE
: will fetch the first row and set it in a task output attribute (therow
attribute for DynamoDB); the data will be stored inside the execution context.FETCH
: will fetch all rows and set them in a task output attribute (therows
attribute for DynamoDB); the data will be stored inside the execution context.STORE
: will store all rows inside Kestra's internal storage. The internal storage will return a URI usually set in the task output attributeuri
and that can be used to retrieve the file from the internal storage.NONE
: will do nothing.
The three fetch
/fetchOne
/store
properties will do the same but using three different task properties instead of a single one.
Storing data
Storing data inside the flow execution context
Data can be stored as variables inside the flow execution context. This can be convenient for sharing data between tasks.
To do so, tasks store data as output attributes that are then available inside the flow via Pebble expressions like {{outputs.taskName.attributeName}}
. More information about outputs can be found here.
Be careful that when the size of the data is significant, this will increase the size of the flow execution context, which can lead to slow execution and increase the size of the execution storage inside Kestra's repository.
Depending on the Kestra internal queue and repository implementation, there can be a hard limit on the size of the flow execution context as it is stored as a single row/message. Usually, this limit is around 1MB, so this is important to avoid storing large amounts of data inside the flow execution context.
Storing data inside the internal storage
Kestra has an internal storage that can store data of any size. By default, the internal storage uses the host filesystem, but plugins exist to use other implementations like Amazon S3, Google Cloud Storage, or Microsoft Azure Blobs storage. See internal storage configuration.
When using the internal storage, data is, by default, stored using Amazon Ion format.
Tasks that can store data inside the internal storage usually have an output attribute named uri
that can be used to access this file in following tasks.
The following example uses the DynamoDB Query task to query a table and the FTP Upload task to send the retrieved rows to an external FTP server.
tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: upload
type: io.kestra.plugin.fs.ftp.Upload
host: localhost
port: 80
from: "{{ outputs.query.uri }}"
to: "/upload/file.ion"
If you need to access data from the internal storage, you can use the Pebble read function to read the file's content as a string.
Dedicated tasks allow managing the files stored inside the internal storage:
- Concat: concat multiple files.
- Delete: delete a file.
- Size: get the size of a file.
- Split: split a file into multiple files depending on the size of the file or the number of rows.
This should be the main method for storing and carrying large data from task to task. As an example, if you know that a HTTP Request will return a heavy payload, you should consider using HTTP Download along with a Serdes instead of carrying raw data in Flow Execution Context
Storing data inside the state store
Dedicated tasks can store data inside Kestra's sate store. The state store transparently uses Kestra's internal storage as its backend store.
The state store allows storing data that will be shared by all executions of the same flow. You can think of it as a key/value store dedicated to a flow (or a namespace if setting the property namespace: true
).
The following tasks are available:
- Set: set a state key/value pair.
- Get: get a state key/value pair.
- Delete: delete a state key/value pair.
Example:
tasks:
- id: setState
type: io.kestra.core.tasks.states.Set
name: myState
data:
name: John Doe
- id: getState
type: io.kestra.core.tasks.states.Get
name: myState
Processing data
You can make basic data processing thanks to variables processing offered by the Pebble templating engine, see variables basic usage.
But these are limited, and you may need more powerful data processing tools; for this, Kestra offers various data processing tasks like file transformations or scripts.
Converting files
Files from the internal storage can be converted from/to the Ion format to/from another format using the Serdes plugin.
The following formats are currently available: Avro, JSON, XML, and Parquet.
Each format offers a reader to read an Ion serialized data file and write it in the target format and a writer to read a file in a specific format and write it as an Ion serialized data file.
For example, to convert an Ion file to CSV, then back to Ion:
tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: convertToCsv
type: io.kestra.plugin.serdes.csv.CsvWriter
from: "{{outputs.query.uri}}"
- id: convertBackToIon
type: io.kestra.plugin.serdes.csv.CsvReader
from: "{{outputs.convertToCsv.uri}}""
Processing data using scripts
Kestra can launch scripts written in Python, R, Node.js, Shell and Powershell. Depending on the runner
, they can run directly in a local process on the host or inside Docker containers.
Those script tasks are available in the Scripts Plugin. Here is documentation for each of them:
- The Python task will run a Python script in a Docker container or in a local process.
- The Node task will run a Node.js script in a Docker container or in a local process.
- The R task will run an R script in a Docker container or in a local process.
- The Shell task will execute a single Shell command, or a list of commands that you provide.
- The PowerShell task will execute a single PowerShell command, or a list of commands that you provide.
The following example will query the BigQuery public dataset with Wikipedia page views to find the top 10 pages, convert it to CSV, and use the CSV file inside a Python task for further transformations using Pandas.
id: wikipedia-top-ten-python-panda
namespace: dev
description: analyze top 10 Wikipedia pages
tasks:
- id: query
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
projectId: geller
serviceAccount: "{{envs.gcp_creds}}"
- id: write-csv
type: io.kestra.plugin.serdes.csv.CsvWriter
from: "{{outputs.query.uri}}"
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: file
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data.csv: "{{outputs['write-csv'].uri}}"
- id: pandas
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
from kestra import Kestra
df = pd.read_csv("data.csv")
views = df['views'].sum()
Kestra.outputs({'views': int(views)})
Kestra offers several plugins for ingesting and transforming data — check the Plugin list for more details.
Make sure to also check:
- The Script documentation for a detailed overview of how to work with Python, R, Node.js, Shell and Powershell scripts and how to integrate them with Git and Docker.
- The Blueprints catalog — simply search for the relevant language (e.g. Python, R, Rust) or use case (ETL, Git, dbt, etc.) to find the relevant examples.
Processing data using file transform
Kestra can process data row by row using file transform tasks. The transformation will be done with a small script written in Python, JavaScript, or Groovy.
- The Jython FileTransform task allows transforming rows with Python.
- The Nashorn FileTransform task allows transforming rows with JavaScript.
- The Groovy FileTransform task allows transforming rows with Groovy.
The following example will query the BigQuery public dataset for Wikipedia pages, convert it row by row with the Nashorn FileTransform, and write it in a CSV file.
id: wikipedia-top-ten-file-transform
namespace: io.kestra.tests
description: A flow that loads wikipedia top 10 EN pages
tasks:
- id: query-top-ten
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
- id: file-transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{outputs['query-top-ten'].uri}}"
script: |
logger.info('row: {}', row)
if (row['title'] === 'Main_Page' || row['title'] === 'Special:Search' || row['title'] === '-') {
// remove un-needed row
row = null
} else {
// add a 'time' column
row['time'] = String(row['date']).substring(11)
// modify the 'date' column to only keep the date part
row['date'] = String(row['date']).substring(0, 10)
}
- id: write-csv
type: io.kestra.plugin.serdes.csv.CsvWriter
from: "{{outputs['file-transform'].uri}}"
The script can access a logger to log messages. Each row is available in a row
variable where each column is accessible using the dictionary notation row['columnName']
.
Purging data
The PurgeExecution task can purge all the files stored inside the internal context by a flow execution.
It can be used at the end of a flow to purge all its generated files.
tasks:
- id: "purge-execution"
type: "io.kestra.core.tasks.storages.PurgeExecution"
The execution context itself will not be available after the end of the execution and will be automatically deleted from Kestra's repository after a retention period (by default, seven days) that can be changed; see configurations.
Also, the Purge task can be used to purge storages, logs, executions of previous execution. For example, this flow will purge all of these every day:
id: purge
namespace: io.kestra.tests
tasks:
- id: "purge"
type: "io.kestra.core.tasks.storages.Purge"
endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 0 * * *"