AssetShipperAssetShipper
AssetShipperCertified

Forward asset lineage to one or more desired destinations.

Forward asset lineage to one or more desired destinations.

The Asset Shipper task extracts asset lineage events from the Kestra backend and loads them to desired destinations including OpenLineage and file storage.

The task works incrementally in batches:

  1. Determines the starting timestamp using either:
    • The last successfully processed assert lineage event's timestamp (persisted in KV Store using the offsetKey)
    • Current time minus lookbackPeriod duration if no previous state exists
  2. Sends retrieved asset lineage events through configured assetExporters
  3. Stores the timestamp of the last processed asset lineage event to maintain state between executions
  4. Subsequent runs continue from the last stored timestamp

This incremental approach ensures reliable asset lineage event forwarding without gaps or duplicates

yaml
type: "io.kestra.plugin.ee.assets.AssetShipper"

Ship asset lineage events to OpenLineage

yaml
id: assetShipper
namespace: company.team

triggers:
  - id: daily
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "@daily"

tasks:
  - id: logSync
    type: io.kestra.plugin.ee.assets.AssetShipper
    assetExporters:
      - id: file
        type: io.kestra.plugin.ee.openlineage.OpenLineageAssetExporter
        uri: http://localhost:3000DO
Properties
Min items1

List of asset exporters

The list of asset exporters to use for sending asset lineage

Definitions
id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
formatstring
DefaultION
Possible Values
IONJSON

Format of the exported files

This property defines the format of the exported files.

logFilePrefixstring
Defaultkestra-log-file

Prefix of the log files

This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.

maxLinesPerFileintegerstring

Maximum number of lines per file

This property specifies the maximum number of lines per log file.

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
uri*Requiredstring

The URI to send the asset lineage events to.

mappingsobject
SubTypeobject

A mapping of asset types to metadata fields to use as namespace and name.

Asset namespace to search

The asset namespace to use to filter asset lineage

SubTypestring

Asset types to search

The list of asset types to use to filter asset lineage

Delete asset usage after export

The asset shipper will delete the exported asset usage

Flow ID to search

The flow identifier to use to filter asset lineage

Flow namespace to search

The flow namespace to use to filter asset lineage

DefaultP1D
Formatduration

Starting duration before now

If no previous execution or state exists, the fetch start date is set to the current time minus this duration

Prefix of the KVStore key

The prefix of the KVStore key that contains the last execution's end fetched date

Task ID to search

The task identifier to use to filter asset lineage

Formatdate-time

The zoned date-time of the last fetched event, used as the starting date for the next execution

The outputs generated by each asset lineage event exporter

Definitions

The number of asset lineage fetched.