Assets
Available on: >= 1.2.0
Track and manage the resources your workflows create and use.
Overview
Assets keeps a live inventory of resources that your workflows interact with. These resources can be database tables, virtual machines, files, or any external system you work with.
You create assets in two ways: automatically through task execution or manually from the Assets UI. Once created, you can view asset details, check which workflow runs created or modified them, and see how assets connect to each other across your workflows.
This feature enables:
- Shipping metadata to lineage providers (e.g., OpenLineage).
- Populating dropdowns or Pebble inputs with live assets (e.g., available VMs).
- Monitoring assets and their state.
Asset definition
Define assets directly on any task using the assets property. Each task can declare inputs assets (resources it reads) and outputs assets (resources it creates or modifies).
Every asset includes these fields:
| Field | Description |
|---|---|
id | unique within a tenant |
namespace | each asset can be associated with a namespace for filtering and RBAC management |
type | use predefined Kestra types like io.kestra.plugin.ee.assets.Table or any custom string value |
displayName | optional human-readable name |
description | markdown-supported documentation |
metadata | map of key-value for adding custom metadata to the given asset |
Data Pipeline Use Cases
Assets are essential for tracking data lineage in analytics and data engineering workflows. The following examples demonstrate how to use assets for simple table creation and complex multi-layer data pipelines.
Example 1: Simple Table Creation
Scenario: You're creating a new database table from scratch. This is a foundational asset with no upstream dependencies.
id: pipeline_with_assets
namespace: company.team
tasks:
- id: create_trips_table
type: io.kestra.plugin.jdbc.sqlite.Queries
url: jdbc:sqlite:myfile.db
outputDbFile: true
sql: |
CREATE TABLE IF NOT EXISTS trips (
VendorID INTEGER,
passenger_count INTEGER,
trip_distance REAL
);
INSERT INTO trips (VendorID, passenger_count, trip_distance) VALUES
(1, 1, 1.5),
(1, 2, 2.3),
(2, 1, 0.8),
(2, 3, 3.1);
assets:
outputs:
- id: trips
namespace: "{{ flow.namespace }}"
type: io.kestra.plugin.ee.assets.Table
metadata:
database: sqlite
table: trips
Key points:
- There are no
inputsassets as this is a source table with no dependencies - The
tripstable is registered as an output asset that downstream workflows can reference - Metadata captures the database type and table name for easier discovery
Example 2: Multi-Layer Data Pipeline
Scenario: You're building a modern data stack with staging and mart layers. The staging layer reads from an external source, and the mart layer creates aggregated analytics tables.
id: data_pipeline_assets
namespace: kestra.company.data
tasks:
- id: create_staging_layer_asset
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
CREATE TABLE IF NOT EXISTS trips AS
select VendorID, passenger_count, trip_distance from sample_data.nyc.taxi limit 10;
assets:
inputs:
- id: sample_data.nyc.taxi
outputs:
- id: trips
namespace: "{{flow.namespace}}"
type: io.kestra.plugin.ee.assets.Table
metadata:
model_layer: staging
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values:
- passenger_count
- trip_distance
tasks:
- id: create_mart_layer_asset
type: io.kestra.plugin.jdbc.duckdb.Query
sql: SELECT AVG({{taskrun.value}}) AS avg_{{taskrun.value}} FROM trips;
assets:
inputs:
- id: trips
outputs:
- id: avg_{{taskrun.value}}
type: io.kestra.plugin.ee.assets.Table
namespace: "{{flow.namespace}}"
metadata:
model_layer: mart
pluginDefaults:
- type: io.kestra.plugin.jdbc.duckdb
values:
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"
fetchType: STORE
What's happening in this pipeline:
- External Source Tracking: The
create_staging_layer_assettask referencessample_data.nyc.taxias an input asset, even though it's managed outside this workflow. This establishes lineage to external data sources. - Staging Layer: The
tripstable is created and registered withmodel_layer: stagingmetadata. This becomes an intermediate asset that mart layers will consume. - Dynamic Mart Creation: The
ForEachtask generates two mart tables:avg_passenger_countavg_trip_distance
Both declare trips as an input, creating a clear dependency chain.
- Complete Lineage Graph: Kestra automatically builds the dependency graph.
Benefits of this approach:
- Impact Analysis: If
sample_data.nyc.taxichanges, you can instantly see that it affects 3 downstream assets - Layer Organization: Filter assets by
model_layerto view only staging or mart tables - Dependency Tracking: Know exactly which tables depend on others before making schema changes
- Audit Trail: Track which workflows created each table and when
Infrastructure Use Case: Team Bucket Provisioning
Assets are particularly valuable for infrastructure management scenarios. This example demonstrates how a DevOps team can provision cloud resources and track their usage across different teams.
Scenario: Your DevOps team needs to create dedicated S3 buckets for multiple teams (Business, Data, Finance, Product). By registering these buckets as assets during provisioning, you establish a clear lineage of which workflows and executions interact with each infrastructure component.
The following flow creates S3 buckets for selected teams and registers them as assets:
id: infra_assets
namespace: kestra.company.infra
inputs:
- id: teams
type: MULTISELECT
values:
- Business
- Data
- Finance
- Product
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ inputs.teams }}"
tasks:
- id: create_bucket
type: io.kestra.plugin.aws.cli.AwsCLI
commands:
- aws s3 mb s3://kestra-{{ taskrun.value | slugify }}-bucket
assets:
outputs:
- id: kestra-{{ taskrun.value | slugify }}-bucket
type: AWS_BUCKET
metadata:
provider: s3
address: s3://kestra-{{ taskrun.value | slugify }}-bucket
pluginDefaults:
- type: io.kestra.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_REGION') }}"
allowFailure: true
This flow dynamically creates buckets (e.g., kestra-data-bucket, kestra-finance-bucket) and registers each as an AWS_BUCKET asset with relevant metadata.
Once the infrastructure is provisioned, teams can reference these assets in their workflows. Here's how the Data team uses their bucket:
id: upload_file
namespace: kestra.company.data
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://raw.githubusercontent.com/kestra-io/datasets/refs/heads/main/jaffle-data/raw_customers.csv
- id: aws_upload
type: io.kestra.plugin.aws.s3.Upload
bucket: kestra-data-bucket
from: '{{ outputs.download.uri }}'
key: raw_customer.csv
assets:
inputs:
- id: kestra-data-bucket
outputs:
- id: raw_customer
type: io.kestra.plugin.ee.assets.File
metadata:
owner: data
pluginDefaults:
- type: io.kestra.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_REGION') }}"
In this workflow:
- The
aws_uploadtask declareskestra-data-bucketas an input asset, linking it to the infrastructure provisioned earlier - It also creates an output asset (
raw_customer) representing the uploaded file - This establishes a complete lineage chain: infrastructure creation → data upload → file asset
Benefits: With this approach, you can easily answer questions like:
- Which teams are using which buckets?
- What files have been uploaded to each bucket?
- Which workflows and executions have interacted with a specific infrastructure component?
- When was this infrastructure resource created and by which flow?
Asset Identifier
An asset is uniquely identified by its id and the tenant (tenantId) where you create it. You can attach a namespace to an asset to improve filtering, and to restrict visibility so only users or groups with the appropriate RBAC can access the asset.
Asset Type
Asset types fall into two categories:
- Kestra-defined asset types: These predefined types use the
io.kestra.core.models.assetsmodel and provide structured metadata fields specific to each asset type. In future iterations of the Assets feature, Kestra plugins will allow to automatically generate assets with these types and populate their metadata fields during task execution. For example, a database plugin could automatically create aTableasset with the system, database, and schema fields filled in based on the connection details.
The current Kestra-defined asset types are the following:
io.kestra.plugin.ee.assets.Dataset- Represents a dataset asset managed by Kestra.
- Metadata:
system,location,format
io.kestra.plugin.ee.assets.File- Represents a file asset, such as documents, logs, or other file-based outputs.
- Metadata:
system,path
io.kestra.plugin.ee.assets.Table- Represents a database table asset with schema and data location metadata.
- Metadata:
system,database,schema
io.kestra.plugin.ee.assets.VM- Represents a virtual machine asset, including attributes like IP address and provider.
- Metadata:
provider,region,state
- Free-form asset types: You can define asset types using any custom string value to represent asset categories that fit your organization's needs. This lets you create and manage your own asset taxonomies, giving you flexibility to describe resources that are not covered by Kestra's standard models. These assets require manual definition and will not be auto-generated by plugins.
Populate dropdowns and app inputs
The assets() Pebble function allows you to query and retrieve assets dynamically in your workflows. This is particularly useful for populating dropdown inputs or dynamically selecting resources based on filters.
Function signature
assets(type: string, namespace: string, metadata: map)
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
type | string | No | Filter assets by type (e.g., "io.kestra.core.models.assets.Table"). If omitted, returns all assets. |
namespace | string | No | Filter assets by namespace. |
metadata | map | No | Filter assets by metadata key-value pairs (e.g., {"key": "value"}). |
Return value
Returns an array of asset objects. Each asset object contains the following properties:
tenantId- The tenant ID where the asset is creatednamespace- The namespace the asset belongs toid- The asset identifiertype- The asset typemetadata- Map of custom metadata key-value pairscreated- ISO 8601 timestamp when the asset was createdupdated- ISO 8601 timestamp when the asset was last updateddeleted- Boolean indicating if the asset has been deleted
Examples
Populate a multiselect dropdown with table assets:
id: select_assets
namespace: company.team
inputs:
- id: assets
type: MULTISELECT
expression: '{{ assets(type="io.kestra.core.models.assets.Table") | jq(".[].id") }}'
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: "{{inputs.assets}}"
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{taskrun.value}}"
Filter assets by namespace:
inputs:
- id: staging_tables
type: MULTISELECT
expression: '{{ assets(type="io.kestra.core.models.assets.Table", namespace="company.team") | jq(".[].id") }}'
Filter assets by metadata:
inputs:
- id: mart_tables
type: MULTISELECT
expression: '{{ assets(metadata={"model_layer": "mart"}) | jq(".[].id") }}'
Get all assets and extract metadata:
id: list_assets_metadata
namespace: company.team
tasks:
- id: list_all_assets
type: io.kestra.plugin.core.log.Log
message: "{{ assets() | jq('.[] | {id: .id, type: .type, metadata: .metadata}') }}"
Export assets with AssetShipper
The AssetShipper task allows you to export asset metadata to external systems for lineage tracking, monitoring, or integration with data catalogs. You can ship assets to files or to lineage providers like OpenLineage.
Export assets to file
Export asset metadata to a file in either ION or JSON format. This is useful for archiving, auditing, or importing into other systems.
id: ship_asset_to_file
namespace: kestra.company.data
tasks:
- id: export_assets
type: io.kestra.plugin.ee.assets.AssetShipper
assetExporters:
- id: file_exporter
type: io.kestra.plugin.ee.assets.FileAssetExporter
format: ION
You can change the format property to JSON if you prefer a more widely-compatible format.
Export assets to OpenLineage
Ship asset metadata to an OpenLineage-compatible lineage provider. This requires mapping Kestra asset fields to OpenLineage conventions.
id: ship_asset_to_openlineage
namespace: kestra.company.data
tasks:
- id: export_to_lineage
type: io.kestra.plugin.ee.assets.AssetShipper
assetExporters:
- id: openlineage_exporter
type: io.kestra.plugin.ee.openlineage.OpenLineageAssetExporter
uri: http://host.docker.internal:5000
mappings:
io.kestra.plugin.ee.assets.Table:
namespace: namespace
The mappings property defines how Kestra asset metadata fields map to OpenLineage dataset facets. Each asset type can have its own mapping configuration. For more information about OpenLineage dataset facets and available fields, see the OpenLineage Dataset Facets documentation.
Was this page helpful?