Assets in Kestra – Track Lineage and Metadata
Track and manage the resources your workflows create and use.
Track workflow assets and lineage
Assets keeps a live inventory of resources that your workflows interact with. These resources can be database tables, virtual machines, files, or any external system you work with.
Assets are captured automatically when tasks declare assets.inputs or assets.outputs; you can also add them manually from the Assets tab. Once created, you can view asset details, check which workflow runs created or modified them, and see how assets connect to each other across your workflows.
This feature enables:
- Shipping metadata to lineage providers (e.g., OpenLineage).
- Populating dropdowns or Pebble inputs with live assets (e.g., available VMs).
- Monitoring assets and their state.
Asset definition
Define assets directly on any task using the assets property. Each task can declare inputs assets (resources it reads) and outputs assets (resources it creates or modifies).
Every asset includes these fields:
| Field | Description |
|---|---|
id | unique within a tenant |
namespace | each asset can be associated with a namespace for filtering and RBAC management |
type | use predefined Kestra types like io.kestra.plugin.ee.assets.Table or any custom string value |
displayName | optional human-readable name |
description | markdown-supported documentation |
metadata | map of key-value for adding custom metadata to the given asset |
Asset Identifier
An asset is uniquely identified by its id and the tenant (tenantId) where you create it. You can attach a namespace to an asset to improve filtering and to restrict visibility so only users or groups with the appropriate RBAC can access the asset.
Asset Type
Asset types fall into two categories:
- Kestra-defined asset types: These predefined types use the
io.kestra.core.models.assetsmodel and provide structured metadata fields specific to each asset type. In future iterations of the Assets feature, Kestra plugins will allow to automatically generate assets with these types and populate their metadata fields during task execution. For example, a database plugin could automatically create aTableasset with the system, database, and schema fields filled in based on the connection details.
The current Kestra-defined asset types are the following:
-
io.kestra.plugin.ee.assets.Dataset- Represents a dataset asset managed by Kestra.
- Metadata:
system,location,format
-
io.kestra.plugin.ee.assets.File- Represents a file asset, such as documents, logs, or other file-based outputs.
- Metadata:
system,path
-
io.kestra.plugin.ee.assets.Table- Represents a database table asset with schema and data location metadata.
- Metadata:
system,database,schema
-
io.kestra.plugin.ee.assets.VM- Represents a virtual machine asset, including attributes like IP address and provider.
- Metadata:
provider,region,state
-
io.kestra.core.models.assets.External- Represents an external asset that exists outside of Kestra’s managed resources.
- This type is automatically assigned when you reference an asset in
assets.inputsthat doesn’t already exist in Kestra. You don’t need to explicitly set the type — Kestra will create the asset with theExternaltype automatically. - This is useful for tracking dependencies on resources managed outside your workflows, such as external database tables, third-party APIs, or manually provisioned infrastructure.
-
Free-form asset types: You can define asset types using any custom string value to represent asset categories that fit your organization’s needs. This lets you create and manage your own asset taxonomies, giving you flexibility to describe resources that are not covered by Kestra’s standard models. These assets require manual definition and will not be auto-generated by plugins.
Quick start: minimal asset flow
A small example that registers one output asset and logs its ID:
id: hello_assetsnamespace: company.team
tasks: - id: write_file type: io.kestra.plugin.core.log.Log message: "Created report.csv" assets: outputs: - id: report.csv type: io.kestra.plugin.ee.assets.File metadata: path: s3://company/reports/report.csv
- id: confirm type: io.kestra.plugin.core.log.Log message: "Asset recorded: {{ assets() | jq('.[] | {id: .id, type: .type, metadata: .metadata}') }}"Auto-generated assets
Some plugins support automatic asset generation when assets.enableAuto: true is set on a task. This removes the need to manually declare assets.inputs and assets.outputs — the plugin inspects its execution context and emits assets automatically:
- JDBC Query: detects
CREATE TABLEstatements and emits a singleio.kestra.plugin.ee.assets.Tableoutput; JDBC URL populatessystemanddatabase. - Ansible CLI: parses
inventoryhosts asinputsof typeio.kestra.core.models.assets.External, marking the infrastructure targets the playbook runs against. - dbt CLI: parses
manifest.jsonto emit each model as anio.kestra.plugin.ee.assets.Tableoutput withdatabase,schema,name, and lineage edges based ondepends_on.
JDBC Query auto-generated assets
id: jdbc_create_tripsnamespace: company.team
tasks: - id: create_trips_table type: io.kestra.plugin.jdbc.sqlite.Query url: jdbc:sqlite:myfile.db outputDbFile: true sql: | CREATE TABLE IF NOT EXISTS trips ( VendorID INTEGER, passenger_count INTEGER, trip_distance REAL ); assets: enableAuto: trueAnsible CLI auto-generated assets
id: ansible_playbooknamespace: company.team
tasks: - id: ansible_task type: io.kestra.plugin.ansible.cli.AnsibleCLI inputFiles: inventory.ini: | localhost ansible_connection=local myplaybook.yml: | --- - hosts: localhost tasks: - name: Print Hello World debug: msg: "Hello, World!" assets: enableAuto: true commands: - ansible-playbook -i inventory.ini myplaybook.ymldbt CLI auto-generated assets
id: dbt_build_duckdbnamespace: company.team
tasks: - id: dbt type: io.kestra.plugin.core.flow.WorkingDirectory tasks: - id: clone_repository type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-example branch: main
- id: dbt_build type: io.kestra.plugin.dbt.cli.DbtCLI taskRunner: type: io.kestra.plugin.scripts.runner.docker.Docker containerImage: ghcr.io/kestra-io/dbt-duckdb:latest commands: - dbt deps - dbt build - dbt run profiles: | my_dbt_project: outputs: dev: type: duckdb path: ":memory:" fixed_retries: 1 threads: 16 timeout_seconds: 300 target: dev assets: enableAuto: trueOperational automation
Assets go beyond lineage: you can manage lifecycle, react to events, and automate remediation directly from flows:
- Imperative lifecycle tasks to create/update, list, and delete assets (
Set,List,Delete). - Event-based triggers with
EventTriggerthat react to asset lifecycle events (CREATED,UPDATED,DELETED,USED). - Freshness monitoring with
FreshnessTriggerto detect stale assets and launch workflows automatically. - Flexible scoping by asset ID, namespace, type, and metadata filters.
- Actionable trigger context (
event,eventTime,lastUpdated,staleDuration,checkTime) to drive alerts, routing, and recovery.
Trigger use mapping
| Trigger | Primary use |
|---|---|
EventTrigger | React instantly to asset lifecycle events (CREATED, UPDATED, DELETED, USED). |
FreshnessTrigger | Poll assets on an interval to detect staleness and launch remediation. |
Operational controls and triggers
Use asset tasks and triggers to automate lifecycle, governance, and freshness checks directly from flows.
Advanced: event-driven automation
id: asset_event_driven_pipelinenamespace: company.data
tasks: - id: transform_to_mart type: io.kestra.plugin.core.flow.Subflow namespace: company.data flowId: create_mart_tables inputs: source_asset_id: "{{ trigger.asset.id }}" source_event: "{{ trigger.asset.event }}" event_time: "{{ trigger.asset.eventTime }}"
triggers: - id: staging_table_event type: io.kestra.plugin.ee.assets.EventTrigger namespace: company.data assetType: io.kestra.plugin.ee.assets.Table events: - CREATED - UPDATED metadataQuery: - field: model_layer type: EQUAL_TO value: stagingAdvanced: audit deletions
id: audit_asset_deletionsnamespace: company.security
tasks: - id: log_deletion type: io.kestra.plugin.jdbc.postgresql.Query sql: | INSERT INTO audit_log (asset_id, asset_type, namespace, event, event_time) VALUES ( '{{ trigger.asset.id }}', '{{ trigger.asset.type }}', '{{ trigger.asset.namespace }}', '{{ trigger.asset.event }}', '{{ trigger.asset.eventTime }}' )
triggers: - id: asset_deletion_event type: io.kestra.plugin.ee.assets.EventTrigger events: - DELETEDAdvanced: freshness monitoring
id: stale_assets_monitornamespace: company.monitoring
tasks: - id: log_stale type: io.kestra.plugin.core.log.Log message: > Found {{ trigger.assets | length }} stale assets. First asset: {{ trigger.assets[0].id ?? 'n/a' }}. Stale for: {{ trigger.assets[0].staleDuration ?? 'n/a' }}.
triggers: - id: stale_assets type: io.kestra.plugin.ee.assets.FreshnessTrigger maxStaleness: PT24H interval: PT1HAdvanced: scoped freshness checks
id: prod_assets_freshnessnamespace: company.monitoring
tasks: - id: trigger_remediation type: io.kestra.plugin.core.flow.Subflow namespace: company.data flowId: refresh_marts inputs: asset_id: "{{ trigger.assets[0].id }}" last_updated: "{{ trigger.assets[0].lastUpdated }}" stale_duration: "{{ trigger.assets[0].staleDuration }}"
triggers: - id: stale_prod_marts type: io.kestra.plugin.ee.assets.FreshnessTrigger namespace: company.data assetType: TABLE maxStaleness: PT6H interval: PT30M metadataQuery: - field: environment type: EQUAL_TO value: prod - field: model_layer type: EQUAL_TO value: martAdvanced: lifecycle tasks
id: asset_lifecycle_opsnamespace: company.data
tasks: - id: upsert_asset type: io.kestra.plugin.ee.assets.Set namespace: assets.data assetId: customers_by_country assetType: TABLE displayName: Customers by Country assetDescription: Customer distribution by country metadata: owner: data-team environment: prod
- id: list_assets type: io.kestra.plugin.ee.assets.List namespace: assets.data types: - TABLE metadataQuery: - field: owner type: EQUAL_TO value: data-team fetchType: FETCH
- id: delete_asset type: io.kestra.plugin.ee.assets.Delete assetId: customers_by_countryData Pipeline Use Cases
Advanced: data pipeline examples
Assets are essential for tracking data lineage in analytics and data engineering workflows. The following examples demonstrate how to use assets for simple table creation and complex multi-layer data pipelines.
Example 1: Simple Table Creation
Scenario: You’re creating a new database table from scratch. This is a foundational asset with no upstream dependencies.
id: pipeline_with_assetsnamespace: company.team
tasks: - id: create_trips_table type: io.kestra.plugin.jdbc.sqlite.Queries url: jdbc:sqlite:myfile.db outputDbFile: true sql: | CREATE TABLE IF NOT EXISTS trips ( VendorID INTEGER, passenger_count INTEGER, trip_distance REAL );
INSERT INTO trips (VendorID, passenger_count, trip_distance) VALUES (1, 1, 1.5), (1, 2, 2.3), (2, 1, 0.8), (2, 3, 3.1); assets: outputs: - id: trips namespace: "{{ flow.namespace }}" type: io.kestra.plugin.ee.assets.Table metadata: database: sqlite table: tripsKey points:
- There are no
inputsassets as this is a source table with no dependencies - The
tripstable is registered as an output asset that downstream workflows can reference - Metadata captures the database type and table name for easier discovery
Example 2: Multi-Layer Data Pipeline
Scenario: You’re building a modern data stack with staging and mart layers. The staging layer reads from an external source, and the mart layer creates aggregated analytics tables.
id: data_pipeline_assetsnamespace: kestra.company.data
tasks: - id: create_staging_layer_asset type: io.kestra.plugin.jdbc.duckdb.Query sql: | CREATE TABLE IF NOT EXISTS trips AS select VendorID, passenger_count, trip_distance from sample_data.nyc.taxi limit 10; assets: inputs: - id: sample_data.nyc.taxi outputs: - id: trips namespace: "{{flow.namespace}}" type: io.kestra.plugin.ee.assets.Table metadata: model_layer: staging
- id: for_each type: io.kestra.plugin.core.flow.ForEach values: - passenger_count - trip_distance tasks: - id: create_mart_layer_asset type: io.kestra.plugin.jdbc.duckdb.Query sql: SELECT AVG({{taskrun.value}}) AS avg_{{taskrun.value}} FROM trips; assets: inputs: - id: trips outputs: - id: avg_{{taskrun.value}} type: io.kestra.plugin.ee.assets.Table namespace: "{{flow.namespace}}" metadata: model_layer: martpluginDefaults: - type: io.kestra.plugin.jdbc.duckdb values: url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}" fetchType: STOREWhat’s happening in this pipeline:
-
External Source Tracking: The
create_staging_layer_assettask referencessample_data.nyc.taxias an input asset, even though it’s managed outside this workflow. This establishes lineage to external data sources. -
Staging Layer: The
tripstable is created and registered withmodel_layer: stagingmetadata. This becomes an intermediate asset that mart layers will consume. -
Dynamic Mart Creation: The
ForEachtask generates two mart tables:avg_passenger_countavg_trip_distance
Both declare
tripsas an input, creating a clear dependency chain. -
Complete Lineage Graph: Kestra automatically builds the dependency graph.
Benefits of this approach:
- Impact Analysis: If
sample_data.nyc.taxichanges, you can instantly see that it affects 3 downstream assets - Layer Organization: Filter assets by
model_layerto view only staging or mart tables - Dependency Tracking: Know exactly which tables depend on others before making schema changes
- Audit Trail: Track which workflows created each table and when
Check out an interactive demo to see the Flow in action:
Infrastructure Use Case: Team Bucket Provisioning
Advanced: infrastructure provisioning
Assets are particularly valuable for infrastructure management scenarios. This example demonstrates how a DevOps team can provision cloud resources and track their usage across different teams.
Scenario: Your DevOps team needs to create dedicated S3 buckets for multiple teams (Business, Data, Finance, Product). By registering these buckets as assets during provisioning, you establish a clear lineage of which workflows and executions interact with each infrastructure component.
The following flow creates S3 buckets for selected teams and registers them as assets:
id: infra_assetsnamespace: kestra.company.infra
inputs: - id: teams type: MULTISELECT values: - Business - Data - Finance - Product
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: "{{ inputs.teams }}" tasks: - id: create_bucket type: io.kestra.plugin.aws.cli.AwsCLI commands: - aws s3 mb s3://kestra-{{ taskrun.value | slugify }}-bucket assets: outputs: - id: kestra-{{ taskrun.value | slugify }}-bucket type: AWS_BUCKET metadata: provider: s3 address: s3://kestra-{{ taskrun.value | slugify }}-bucket
pluginDefaults: - type: io.kestra.plugin.aws values: accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}" secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}" region: "{{ secret('AWS_REGION') }}" allowFailure: trueThis flow dynamically creates buckets (e.g., kestra-data-bucket, kestra-finance-bucket) and registers each as an AWS_BUCKET asset with relevant metadata.
Once the infrastructure is provisioned, teams can reference these assets in their workflows. Here’s how the Data team uses their bucket:
id: upload_filenamespace: kestra.company.data
tasks: - id: download type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/jaffle-csv/raw_customers.csv
- id: aws_upload type: io.kestra.plugin.aws.s3.Upload bucket: kestra-data-bucket from: '{{ outputs.download.uri }}' key: raw_customer.csv assets: inputs: - id: kestra-data-bucket outputs: - id: raw_customer type: io.kestra.plugin.ee.assets.File metadata: owner: data
pluginDefaults: - type: io.kestra.plugin.aws values: accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}" secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}" region: "{{ secret('AWS_REGION') }}"In this workflow:
- The
aws_uploadtask declareskestra-data-bucketas an input asset, linking it to the infrastructure provisioned earlier - It also creates an output asset (
raw_customer) representing the uploaded file - This establishes a complete lineage chain: infrastructure creation → data upload → file asset
Benefits: With this approach, you can easily answer questions like:
- Which teams are using which buckets?
- What files have been uploaded to each bucket?
- Which workflows and executions have interacted with a specific infrastructure component?
- When was this infrastructure resource created and by which flow?
Populate dropdowns and app inputs
The assets() Pebble function allows you to query and retrieve assets dynamically in your workflows. This is particularly useful for populating dropdown inputs or dynamically selecting resources based on filters.
Function signature
assets(type: string, namespace: string, metadata: map)Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
type | string | No | Filter assets by type (e.g., "io.kestra.core.models.assets.Table"). If omitted, returns all assets. |
namespace | string | No | Filter assets by namespace. |
metadata | map | No | Filter assets by metadata key-value pairs (e.g., {"key": "value"}). |
Return value
Returns an array of asset objects. Each asset object contains the following properties:
tenantId- The tenant ID where the asset is creatednamespace- The namespace the asset belongs toid- The asset identifiertype- The asset typemetadata- Map of custom metadata key-value pairscreated- ISO 8601 timestamp when the asset was createdupdated- ISO 8601 timestamp when the asset was last updateddeleted- Boolean indicating if the asset has been deleted
Examples
Populate a multiselect dropdown with table assets:
id: select_assetsnamespace: company.team
inputs: - id: assets type: MULTISELECT expression: '{{ assets(type="io.kestra.core.models.assets.Table") | jq(".[].id") }}'
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: "{{inputs.assets}}" tasks: - id: log type: io.kestra.plugin.core.log.Log message: "{{taskrun.value}}"Filter assets by namespace:
inputs: - id: staging_tables type: MULTISELECT expression: '{{ assets(type="io.kestra.core.models.assets.Table", namespace="company.team") | jq(".[].id") }}'Filter assets by metadata:
inputs: - id: mart_tables type: MULTISELECT expression: '{{ assets(metadata={"model_layer": "mart"}) | jq(".[].id") }}'Get all assets and extract metadata:
id: list_assets_metadatanamespace: company.teamtasks: - id: list_all_assets type: io.kestra.plugin.core.log.Log message: "{{ assets() | jq('.[] | {id: .id, type: .type, metadata: .metadata}') }}"Export assets with AssetShipper
The AssetShipper task allows you to export asset metadata to external systems for lineage tracking, monitoring, or integration with data catalogs. You can ship assets to files or to lineage providers like OpenLineage.
Export assets to file
Export asset metadata to a file in either ION or JSON format. This is useful for archiving, auditing, or importing into other systems.
id: ship_asset_to_filenamespace: kestra.company.data
tasks: - id: export_assets type: io.kestra.plugin.ee.assets.AssetShipper assetExporters: - id: file_exporter type: io.kestra.plugin.ee.assets.FileAssetExporter format: IONYou can change the format property to JSON if you prefer a more widely-compatible format.
Export assets to OpenLineage
Ship asset metadata to an OpenLineage-compatible lineage provider. This requires mapping Kestra asset fields to OpenLineage conventions.
id: ship_asset_to_openlineagenamespace: kestra.company.data
tasks: - id: export_to_lineage type: io.kestra.plugin.ee.assets.AssetShipper assetExporters: - id: openlineage_exporter type: io.kestra.plugin.ee.openlineage.OpenLineageAssetExporter uri: http://host.docker.internal:5000 mappings: io.kestra.plugin.ee.assets.Table: namespace: namespaceThe mappings property defines how Kestra asset metadata fields map to OpenLineage dataset facets. Each asset type can have its own mapping configuration. For more information about OpenLineage dataset facets and available fields, see the OpenLineage Dataset Facets documentation.
Purge assets and lineage (retention)
Use the io.kestra.plugin.ee.assets.PurgeAssets task to enforce asset retention without touching executions or logs. By default, this task purges assets, asset usage events (execution view), and asset lineage events (for asset exporters) matching the filters. You can configure it to only purge specific types of records.
Filters:
| Property | Description |
|---|---|
namespace | Filter by namespace. Supports prefix matching (e.g., company.data matches company.data.staging). |
assetId | Filter by a specific asset ID. |
assetType | Filter by one or more asset types (e.g., io.kestra.plugin.ee.assets.Table). |
metadataQuery | Filter by metadata key-value pairs. |
endDate | (required) Purge records created or updated before this date (ISO 8601). |
Purge scope:
| Property | Default | Description |
|---|---|---|
purgeAssets | true | Whether to purge the asset records themselves. |
purgeAssetUsages | true | Whether to purge asset usage events (execution view). |
purgeAssetLineages | true | Whether to purge asset lineage events. |
Outputs: purgedAssetsCount, purgedAssetUsagesCount, purgedAssetLineagesCount.
Example: purge old VM assets on a monthly schedule.
id: asset_retention_policynamespace: company.infra
triggers: - id: monthly_cleanup type: io.kestra.plugin.core.trigger.Schedule cron: "0 0 1 * *"
tasks: - id: purge_old_vms type: io.kestra.plugin.ee.assets.PurgeAssets assetType: - io.kestra.plugin.ee.assets.VM endDate: "{{ now() | dateAdd(-180, 'DAYS') }}"Was this page helpful?