Assets in Kestra – Track Lineage and Metadata

Track and manage the resources your workflows create and use.

Track workflow assets and lineage

Assets keeps a live inventory of resources that your workflows interact with. These resources can be database tables, virtual machines, files, or any external system you work with.

Assets are captured automatically when tasks declare assets.inputs or assets.outputs; you can also add them manually from the Assets tab. Once created, you can view asset details, check which workflow runs created or modified them, and see how assets connect to each other across your workflows.

This feature enables:

  • Shipping metadata to lineage providers (e.g., OpenLineage).
  • Populating dropdowns or Pebble inputs with live assets (e.g., available VMs).
  • Monitoring assets and their state.

Asset definition

Define assets directly on any task using the assets property. Each task can declare inputs assets (resources it reads) and outputs assets (resources it creates or modifies).

Every asset includes these fields:

FieldDescription
idunique within a tenant
namespaceeach asset can be associated with a namespace for filtering and RBAC management
typeuse predefined Kestra types like io.kestra.plugin.ee.assets.Table or any custom string value
displayNameoptional human-readable name
descriptionmarkdown-supported documentation
metadatamap of key-value for adding custom metadata to the given asset

Asset Identifier

An asset is uniquely identified by its id and the tenant (tenantId) where you create it. You can attach a namespace to an asset to improve filtering and to restrict visibility so only users or groups with the appropriate RBAC can access the asset.

Asset Type

Asset types fall into two categories:

  • Kestra-defined asset types: These predefined types use the io.kestra.core.models.assets model and provide structured metadata fields specific to each asset type. In future iterations of the Assets feature, Kestra plugins will allow to automatically generate assets with these types and populate their metadata fields during task execution. For example, a database plugin could automatically create a Table asset with the system, database, and schema fields filled in based on the connection details.

The current Kestra-defined asset types are the following:

  • io.kestra.plugin.ee.assets.Dataset

    • Represents a dataset asset managed by Kestra.
    • Metadata: system, location, format
  • io.kestra.plugin.ee.assets.File

    • Represents a file asset, such as documents, logs, or other file-based outputs.
    • Metadata: system, path
  • io.kestra.plugin.ee.assets.Table

    • Represents a database table asset with schema and data location metadata.
    • Metadata: system, database, schema
  • io.kestra.plugin.ee.assets.VM

    • Represents a virtual machine asset, including attributes like IP address and provider.
    • Metadata: provider, region, state
  • io.kestra.core.models.assets.External

    • Represents an external asset that exists outside of Kestra’s managed resources.
    • This type is automatically assigned when you reference an asset in assets.inputs that doesn’t already exist in Kestra. You don’t need to explicitly set the type — Kestra will create the asset with the External type automatically.
    • This is useful for tracking dependencies on resources managed outside your workflows, such as external database tables, third-party APIs, or manually provisioned infrastructure.
  • Free-form asset types: You can define asset types using any custom string value to represent asset categories that fit your organization’s needs. This lets you create and manage your own asset taxonomies, giving you flexibility to describe resources that are not covered by Kestra’s standard models. These assets require manual definition and will not be auto-generated by plugins.

Quick start: minimal asset flow

A small example that registers one output asset and logs its ID:

id: hello_assets
namespace: company.team
tasks:
- id: write_file
type: io.kestra.plugin.core.log.Log
message: "Created report.csv"
assets:
outputs:
- id: report.csv
type: io.kestra.plugin.ee.assets.File
metadata:
path: s3://company/reports/report.csv
- id: confirm
type: io.kestra.plugin.core.log.Log
message: "Asset recorded: {{ assets() | jq('.[] | {id: .id, type: .type, metadata: .metadata}') }}"

Auto-generated assets

Some plugins support automatic asset generation when assets.enableAuto: true is set on a task. This removes the need to manually declare assets.inputs and assets.outputs — the plugin inspects its execution context and emits assets automatically:

  • JDBC Query: detects CREATE TABLE statements and emits a single io.kestra.plugin.ee.assets.Table output; JDBC URL populates system and database.
  • Ansible CLI: parses inventory hosts as inputs of type io.kestra.core.models.assets.External, marking the infrastructure targets the playbook runs against.
  • dbt CLI: parses manifest.json to emit each model as an io.kestra.plugin.ee.assets.Table output with database, schema, name, and lineage edges based on depends_on.
JDBC Query auto-generated assets
id: jdbc_create_trips
namespace: company.team
tasks:
- id: create_trips_table
type: io.kestra.plugin.jdbc.sqlite.Query
url: jdbc:sqlite:myfile.db
outputDbFile: true
sql: |
CREATE TABLE IF NOT EXISTS trips (
VendorID INTEGER,
passenger_count INTEGER,
trip_distance REAL
);
assets:
enableAuto: true
Ansible CLI auto-generated assets
id: ansible_playbook
namespace: company.team
tasks:
- id: ansible_task
type: io.kestra.plugin.ansible.cli.AnsibleCLI
inputFiles:
inventory.ini: |
localhost ansible_connection=local
myplaybook.yml: |
---
- hosts: localhost
tasks:
- name: Print Hello World
debug:
msg: "Hello, World!"
assets:
enableAuto: true
commands:
- ansible-playbook -i inventory.ini myplaybook.yml
dbt CLI auto-generated assets
id: dbt_build_duckdb
namespace: company.team
tasks:
- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: main
- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
commands:
- dbt deps
- dbt build
- dbt run
profiles: |
my_dbt_project:
outputs:
dev:
type: duckdb
path: ":memory:"
fixed_retries: 1
threads: 16
timeout_seconds: 300
target: dev
assets:
enableAuto: true

Operational automation

Assets go beyond lineage: you can manage lifecycle, react to events, and automate remediation directly from flows:

  • Imperative lifecycle tasks to create/update, list, and delete assets (Set, List, Delete).
  • Event-based triggers with EventTrigger that react to asset lifecycle events (CREATED, UPDATED, DELETED, USED).
  • Freshness monitoring with FreshnessTrigger to detect stale assets and launch workflows automatically.
  • Flexible scoping by asset ID, namespace, type, and metadata filters.
  • Actionable trigger context (event, eventTime, lastUpdated, staleDuration, checkTime) to drive alerts, routing, and recovery.

Trigger use mapping

TriggerPrimary use
EventTriggerReact instantly to asset lifecycle events (CREATED, UPDATED, DELETED, USED).
FreshnessTriggerPoll assets on an interval to detect staleness and launch remediation.

Operational controls and triggers

Use asset tasks and triggers to automate lifecycle, governance, and freshness checks directly from flows.

Advanced: event-driven automation
id: asset_event_driven_pipeline
namespace: company.data
tasks:
- id: transform_to_mart
type: io.kestra.plugin.core.flow.Subflow
namespace: company.data
flowId: create_mart_tables
inputs:
source_asset_id: "{{ trigger.asset.id }}"
source_event: "{{ trigger.asset.event }}"
event_time: "{{ trigger.asset.eventTime }}"
triggers:
- id: staging_table_event
type: io.kestra.plugin.ee.assets.EventTrigger
namespace: company.data
assetType: io.kestra.plugin.ee.assets.Table
events:
- CREATED
- UPDATED
metadataQuery:
- field: model_layer
type: EQUAL_TO
value: staging
Advanced: audit deletions
id: audit_asset_deletions
namespace: company.security
tasks:
- id: log_deletion
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO audit_log (asset_id, asset_type, namespace, event, event_time)
VALUES (
'{{ trigger.asset.id }}',
'{{ trigger.asset.type }}',
'{{ trigger.asset.namespace }}',
'{{ trigger.asset.event }}',
'{{ trigger.asset.eventTime }}'
)
triggers:
- id: asset_deletion_event
type: io.kestra.plugin.ee.assets.EventTrigger
events:
- DELETED
Advanced: freshness monitoring
id: stale_assets_monitor
namespace: company.monitoring
tasks:
- id: log_stale
type: io.kestra.plugin.core.log.Log
message: >
Found {{ trigger.assets | length }} stale assets.
First asset: {{ trigger.assets[0].id ?? 'n/a' }}.
Stale for: {{ trigger.assets[0].staleDuration ?? 'n/a' }}.
triggers:
- id: stale_assets
type: io.kestra.plugin.ee.assets.FreshnessTrigger
maxStaleness: PT24H
interval: PT1H
Advanced: scoped freshness checks
id: prod_assets_freshness
namespace: company.monitoring
tasks:
- id: trigger_remediation
type: io.kestra.plugin.core.flow.Subflow
namespace: company.data
flowId: refresh_marts
inputs:
asset_id: "{{ trigger.assets[0].id }}"
last_updated: "{{ trigger.assets[0].lastUpdated }}"
stale_duration: "{{ trigger.assets[0].staleDuration }}"
triggers:
- id: stale_prod_marts
type: io.kestra.plugin.ee.assets.FreshnessTrigger
namespace: company.data
assetType: TABLE
maxStaleness: PT6H
interval: PT30M
metadataQuery:
- field: environment
type: EQUAL_TO
value: prod
- field: model_layer
type: EQUAL_TO
value: mart
Advanced: lifecycle tasks
id: asset_lifecycle_ops
namespace: company.data
tasks:
- id: upsert_asset
type: io.kestra.plugin.ee.assets.Set
namespace: assets.data
assetId: customers_by_country
assetType: TABLE
displayName: Customers by Country
assetDescription: Customer distribution by country
metadata:
owner: data-team
environment: prod
- id: list_assets
type: io.kestra.plugin.ee.assets.List
namespace: assets.data
types:
- TABLE
metadataQuery:
- field: owner
type: EQUAL_TO
value: data-team
fetchType: FETCH
- id: delete_asset
type: io.kestra.plugin.ee.assets.Delete
assetId: customers_by_country

Data Pipeline Use Cases

Advanced: data pipeline examples

Assets are essential for tracking data lineage in analytics and data engineering workflows. The following examples demonstrate how to use assets for simple table creation and complex multi-layer data pipelines.

Example 1: Simple Table Creation

Scenario: You’re creating a new database table from scratch. This is a foundational asset with no upstream dependencies.

id: pipeline_with_assets
namespace: company.team
tasks:
- id: create_trips_table
type: io.kestra.plugin.jdbc.sqlite.Queries
url: jdbc:sqlite:myfile.db
outputDbFile: true
sql: |
CREATE TABLE IF NOT EXISTS trips (
VendorID INTEGER,
passenger_count INTEGER,
trip_distance REAL
);
INSERT INTO trips (VendorID, passenger_count, trip_distance) VALUES
(1, 1, 1.5),
(1, 2, 2.3),
(2, 1, 0.8),
(2, 3, 3.1);
assets:
outputs:
- id: trips
namespace: "{{ flow.namespace }}"
type: io.kestra.plugin.ee.assets.Table
metadata:
database: sqlite
table: trips

Key points:

  • There are no inputs assets as this is a source table with no dependencies
  • The trips table is registered as an output asset that downstream workflows can reference
  • Metadata captures the database type and table name for easier discovery

Example 2: Multi-Layer Data Pipeline

Scenario: You’re building a modern data stack with staging and mart layers. The staging layer reads from an external source, and the mart layer creates aggregated analytics tables.

id: data_pipeline_assets
namespace: kestra.company.data
tasks:
- id: create_staging_layer_asset
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
CREATE TABLE IF NOT EXISTS trips AS
select VendorID, passenger_count, trip_distance from sample_data.nyc.taxi limit 10;
assets:
inputs:
- id: sample_data.nyc.taxi
outputs:
- id: trips
namespace: "{{flow.namespace}}"
type: io.kestra.plugin.ee.assets.Table
metadata:
model_layer: staging
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values:
- passenger_count
- trip_distance
tasks:
- id: create_mart_layer_asset
type: io.kestra.plugin.jdbc.duckdb.Query
sql: SELECT AVG({{taskrun.value}}) AS avg_{{taskrun.value}} FROM trips;
assets:
inputs:
- id: trips
outputs:
- id: avg_{{taskrun.value}}
type: io.kestra.plugin.ee.assets.Table
namespace: "{{flow.namespace}}"
metadata:
model_layer: mart
pluginDefaults:
- type: io.kestra.plugin.jdbc.duckdb
values:
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"
fetchType: STORE

What’s happening in this pipeline:

  1. External Source Tracking: The create_staging_layer_asset task references sample_data.nyc.taxi as an input asset, even though it’s managed outside this workflow. This establishes lineage to external data sources.

  2. Staging Layer: The trips table is created and registered with model_layer: staging metadata. This becomes an intermediate asset that mart layers will consume.

  3. Dynamic Mart Creation: The ForEach task generates two mart tables:

    • avg_passenger_count
    • avg_trip_distance

    Both declare trips as an input, creating a clear dependency chain.

  4. Complete Lineage Graph: Kestra automatically builds the dependency graph.

Benefits of this approach:

  • Impact Analysis: If sample_data.nyc.taxi changes, you can instantly see that it affects 3 downstream assets
  • Layer Organization: Filter assets by model_layer to view only staging or mart tables
  • Dependency Tracking: Know exactly which tables depend on others before making schema changes
  • Audit Trail: Track which workflows created each table and when

Check out an interactive demo to see the Flow in action:


Infrastructure Use Case: Team Bucket Provisioning

Advanced: infrastructure provisioning

Assets are particularly valuable for infrastructure management scenarios. This example demonstrates how a DevOps team can provision cloud resources and track their usage across different teams.

Scenario: Your DevOps team needs to create dedicated S3 buckets for multiple teams (Business, Data, Finance, Product). By registering these buckets as assets during provisioning, you establish a clear lineage of which workflows and executions interact with each infrastructure component.

The following flow creates S3 buckets for selected teams and registers them as assets:

id: infra_assets
namespace: kestra.company.infra
inputs:
- id: teams
type: MULTISELECT
values:
- Business
- Data
- Finance
- Product
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ inputs.teams }}"
tasks:
- id: create_bucket
type: io.kestra.plugin.aws.cli.AwsCLI
commands:
- aws s3 mb s3://kestra-{{ taskrun.value | slugify }}-bucket
assets:
outputs:
- id: kestra-{{ taskrun.value | slugify }}-bucket
type: AWS_BUCKET
metadata:
provider: s3
address: s3://kestra-{{ taskrun.value | slugify }}-bucket
pluginDefaults:
- type: io.kestra.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_REGION') }}"
allowFailure: true

This flow dynamically creates buckets (e.g., kestra-data-bucket, kestra-finance-bucket) and registers each as an AWS_BUCKET asset with relevant metadata.

Once the infrastructure is provisioned, teams can reference these assets in their workflows. Here’s how the Data team uses their bucket:

id: upload_file
namespace: kestra.company.data
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/jaffle-csv/raw_customers.csv
- id: aws_upload
type: io.kestra.plugin.aws.s3.Upload
bucket: kestra-data-bucket
from: '{{ outputs.download.uri }}'
key: raw_customer.csv
assets:
inputs:
- id: kestra-data-bucket
outputs:
- id: raw_customer
type: io.kestra.plugin.ee.assets.File
metadata:
owner: data
pluginDefaults:
- type: io.kestra.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ secret('AWS_REGION') }}"

In this workflow:

  • The aws_upload task declares kestra-data-bucket as an input asset, linking it to the infrastructure provisioned earlier
  • It also creates an output asset (raw_customer) representing the uploaded file
  • This establishes a complete lineage chain: infrastructure creation → data upload → file asset

Benefits: With this approach, you can easily answer questions like:

  • Which teams are using which buckets?
  • What files have been uploaded to each bucket?
  • Which workflows and executions have interacted with a specific infrastructure component?
  • When was this infrastructure resource created and by which flow?

Populate dropdowns and app inputs

The assets() Pebble function allows you to query and retrieve assets dynamically in your workflows. This is particularly useful for populating dropdown inputs or dynamically selecting resources based on filters.

Function signature

assets(type: string, namespace: string, metadata: map)

Parameters

ParameterTypeRequiredDescription
typestringNoFilter assets by type (e.g., "io.kestra.core.models.assets.Table"). If omitted, returns all assets.
namespacestringNoFilter assets by namespace.
metadatamapNoFilter assets by metadata key-value pairs (e.g., {"key": "value"}).

Return value

Returns an array of asset objects. Each asset object contains the following properties:

  • tenantId - The tenant ID where the asset is created
  • namespace - The namespace the asset belongs to
  • id - The asset identifier
  • type - The asset type
  • metadata - Map of custom metadata key-value pairs
  • created - ISO 8601 timestamp when the asset was created
  • updated - ISO 8601 timestamp when the asset was last updated
  • deleted - Boolean indicating if the asset has been deleted

Examples

Populate a multiselect dropdown with table assets:

id: select_assets
namespace: company.team
inputs:
- id: assets
type: MULTISELECT
expression: '{{ assets(type="io.kestra.core.models.assets.Table") | jq(".[].id") }}'
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: "{{inputs.assets}}"
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{taskrun.value}}"

Filter assets by namespace:

inputs:
- id: staging_tables
type: MULTISELECT
expression: '{{ assets(type="io.kestra.core.models.assets.Table", namespace="company.team") | jq(".[].id") }}'

Filter assets by metadata:

inputs:
- id: mart_tables
type: MULTISELECT
expression: '{{ assets(metadata={"model_layer": "mart"}) | jq(".[].id") }}'

Get all assets and extract metadata:

id: list_assets_metadata
namespace: company.team
tasks:
- id: list_all_assets
type: io.kestra.plugin.core.log.Log
message: "{{ assets() | jq('.[] | {id: .id, type: .type, metadata: .metadata}') }}"

Export assets with AssetShipper

The AssetShipper task allows you to export asset metadata to external systems for lineage tracking, monitoring, or integration with data catalogs. You can ship assets to files or to lineage providers like OpenLineage.

Export assets to file

Export asset metadata to a file in either ION or JSON format. This is useful for archiving, auditing, or importing into other systems.

id: ship_asset_to_file
namespace: kestra.company.data
tasks:
- id: export_assets
type: io.kestra.plugin.ee.assets.AssetShipper
assetExporters:
- id: file_exporter
type: io.kestra.plugin.ee.assets.FileAssetExporter
format: ION

You can change the format property to JSON if you prefer a more widely-compatible format.

Export assets to OpenLineage

Ship asset metadata to an OpenLineage-compatible lineage provider. This requires mapping Kestra asset fields to OpenLineage conventions.

id: ship_asset_to_openlineage
namespace: kestra.company.data
tasks:
- id: export_to_lineage
type: io.kestra.plugin.ee.assets.AssetShipper
assetExporters:
- id: openlineage_exporter
type: io.kestra.plugin.ee.openlineage.OpenLineageAssetExporter
uri: http://host.docker.internal:5000
mappings:
io.kestra.plugin.ee.assets.Table:
namespace: namespace

The mappings property defines how Kestra asset metadata fields map to OpenLineage dataset facets. Each asset type can have its own mapping configuration. For more information about OpenLineage dataset facets and available fields, see the OpenLineage Dataset Facets documentation.

Purge assets and lineage (retention)

Use the io.kestra.plugin.ee.assets.PurgeAssets task to enforce asset retention without touching executions or logs. By default, this task purges assets, asset usage events (execution view), and asset lineage events (for asset exporters) matching the filters. You can configure it to only purge specific types of records.

Filters:

PropertyDescription
namespaceFilter by namespace. Supports prefix matching (e.g., company.data matches company.data.staging).
assetIdFilter by a specific asset ID.
assetTypeFilter by one or more asset types (e.g., io.kestra.plugin.ee.assets.Table).
metadataQueryFilter by metadata key-value pairs.
endDate(required) Purge records created or updated before this date (ISO 8601).

Purge scope:

PropertyDefaultDescription
purgeAssetstrueWhether to purge the asset records themselves.
purgeAssetUsagestrueWhether to purge asset usage events (execution view).
purgeAssetLineagestrueWhether to purge asset lineage events.

Outputs: purgedAssetsCount, purgedAssetUsagesCount, purgedAssetLineagesCount.

Example: purge old VM assets on a monthly schedule.

id: asset_retention_policy
namespace: company.infra
triggers:
- id: monthly_cleanup
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 1 * *"
tasks:
- id: purge_old_vms
type: io.kestra.plugin.ee.assets.PurgeAssets
assetType:
- io.kestra.plugin.ee.assets.VM
endDate: "{{ now() | dateAdd(-180, 'DAYS') }}"

Was this page helpful?