
AssetShipper
Forward asset lineage to one or more desired destinations.
Forward asset lineage to one or more desired destinations.
Forward asset lineage to one or more desired destinations.
The Asset Shipper task extracts asset lineage events from the Kestra backend and loads them to desired destinations including OpenLineage and file storage.
The task works incrementally in batches:
- Determines the starting timestamp using either:
- The last successfully processed assert lineage event's timestamp (persisted in KV Store using the
offsetKey) - Current time minus
lookbackPeriodduration if no previous state exists
- The last successfully processed assert lineage event's timestamp (persisted in KV Store using the
- Sends retrieved asset lineage events through configured
assetExporters - Stores the timestamp of the last processed asset lineage event to maintain state between executions
- Subsequent runs continue from the last stored timestamp
This incremental approach ensures reliable asset lineage event forwarding without gaps or duplicates
type: "io.kestra.plugin.ee.assets.AssetShipper"Examples
Ship asset lineage events to OpenLineage
id: assetShipper
namespace: company.team
triggers:
- id: daily
type: io.kestra.plugin.core.trigger.Schedule
cron: "@daily"
tasks:
- id: logSync
type: io.kestra.plugin.ee.assets.AssetShipper
assetExporters:
- id: file
type: io.kestra.plugin.ee.openlineage.OpenLineageAssetExporter
uri: http://localhost:3000DO
Properties
assetExporters*RequiredNon-dynamic
1List of asset exporters
The list of asset exporters to use for sending asset lineage
Ship asset lineage events to a file inside Kestra's internal storage.
^[a-zA-Z0-9][a-zA-Z0-9_-]*1IONIONJSONFormat of the exported files
This property defines the format of the exported files.
kestra-log-filePrefix of the log files
This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.
Maximum number of lines per file
This property specifies the maximum number of lines per log file.
Ship asset lineage events to an OpenLineage compatible destination.
^[a-zA-Z0-9][a-zA-Z0-9_-]*1The URI to send the asset lineage events to.
A mapping of asset types to metadata fields to use as namespace and name.
assetNamespacestring
Asset namespace to search
The asset namespace to use to filter asset lineage
assetTypesarray
Asset types to search
The list of asset types to use to filter asset lineage
deletebooleanstring
Delete asset usage after export
The asset shipper will delete the exported asset usage
flowIdstring
Flow ID to search
The flow identifier to use to filter asset lineage
flowNamespacestring
Flow namespace to search
The flow namespace to use to filter asset lineage
lookbackPeriodstring
P1DdurationStarting duration before now
If no previous execution or state exists, the fetch start date is set to the current time minus this duration
offsetKeystring
Prefix of the KVStore key
The prefix of the KVStore key that contains the last execution's end fetched date
taskIdstring
Task ID to search
The task identifier to use to filter asset lineage
Outputs
endFetchedDatestring
date-timeThe zoned date-time of the last fetched event, used as the starting date for the next execution
outputsobject
The outputs generated by each asset lineage event exporter
io.kestra.core.models.tasks.Output
sizeinteger
The number of asset lineage fetched.