LogShipper​Log​Shipper

Forward workflow execution logs to one or more desired destinations.

The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.

The task works incrementally in batches:

  1. Determines the starting timestamp using either:
    • The last successfully processed log's timestamp (persisted in KV Store using the offsetKey)
    • Current time minus lookbackPeriod duration if no previous state exists
  2. Sends retrieved logs through configured logExporters
  3. Stores the timestamp of the last processed log to maintain state between executions
  4. Subsequent runs continue from the last stored timestamp

This incremental approach ensures reliable log forwarding without gaps or duplicates.

yaml
type: "io.kestra.plugin.ee.core.log.LogShipper"

Ship logs to multiple destinations

yaml
id: logShipper
namespace: system

tasks:
  - id: shipLogs
    type: io.kestra.plugin.ee.core.log.LogShipper
    logLevelFilter: INFO
    lookbackPeriod: P1D
    offsetKey: logShipperOffset
    delete: false
    logExporters:
      - id: file
        type: io.kestra.plugin.ee.core.log.FileLogExporter

      - id: awsCloudWatch
        type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: us-east-1
        logGroupName: kestra
        logStreamName: production

      - id: S3LogExporter
        type: io.kestra.plugin.ee.aws.s3.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: "{{ vars.region }}"
        format: JSON
        bucket: logbucket
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000

      - id: googleOperationalSuite
        type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
        projectId: my-gcp-project

      - id: gcs
        type: io.kestra.plugin.ee.gcp.gcs.LogExporter
        projectId: myProjectId
        format: JSON
        maxLinesPerFile: 10000
        bucket: my-bucket
        logFilePrefix: kestra-log-file
        chunk: 1000

      - id: azureMonitor
        type: io.kestra.plugin.ee.azure.monitor.LogExporter
        endpoint: https://endpoint-host.ingest.monitor.azure.com
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
        streamName: kestraLogs

      - id: azureBlobStorage
        type: io.kestra.plugin.ee.azure.storage.LogExporter
        endpoint: https://myblob.blob.core.windows.net/
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        containerName: logs
        format: JSON
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000
        chunk: 1000

      - id: datadog
        type: io.kestra.plugin.ee.datadog.LogExporter
        basePath: https://http-intake.logs.datadoghq.eu
        apiKey: "{{ secret('DATADOG_API_KEY') }}"

      - id: elasticsearch
        type: io.kestra.plugin.ee.elasticsearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200

      - id: opensearch
        type: io.kestra.plugin.ee.opensearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200

      - id: newRelic
        type: io.kestra.plugin.ee.newrelic.LogExporter
        basePath: https://log-api.newrelic.com
        apiKey: "{{ secret('NEWRELIC_API_KEY') }}"

      - id: openTelemetry
        type: io.kestra.plugin.ee.opentelemetry.LogExporter
        otlpEndpoint: http://otel-collector:4318/v1/logs
        authorizationHeaderName: Authorization
        authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"

triggers:
  - id: dailySchedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"
    disabled: true
Properties
Min items 1

List of log shippers

The list of log shippers to use for sending logs

Delete logs after export

The log shipper will delete the exported logs

Execution to search

The executionID to use to filter logs

Flow to search

The flow ID to use to filter logs

Default INFO

Log level to send

This property specifies the minimum log level to send.

Default P1D
Format duration

Starting duration before now

If no previous execution or state exists, the fetch start date is set to the current time minus this duration

Namespace to search

The namespace to use to filter logs

Prefix of the KVStore key

The prefix of the KVStore key that contains the last execution's end fetched date

Format date-time

The zoned date-time of the last fetched log, used as the starting date for the next execution

SubType

The outputs generated by each log shipper

The number of logs fetched.

Format duration

The time allowed to establish a connection to the server before failing.

Default PT5M
Format duration

The time allowed for a read connection to remain idle before closing it.

The connection properties.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

The name of the index to send logs to

Default 1000

The chunk size for every bulk request.

The address of the proxy server.

The password for proxy authentication.

The port of the proxy server.

Default DIRECT
Possible Values
DIRECTHTTPSOCKS

The type of proxy to use.

The username for proxy authentication.

SubType string

List of HTTP OpenSearch servers.

Must be an URI like https://opensearch.com: 9200 with scheme and port.

Basic auth configuration.

SubType string

List of HTTP headers to be send on every request.

Must be a string with key value separated with : , ex: Authorization: Token XYZ.

Sets the path's prefix for every request used by the HTTP client.

For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint. In essence, every request's endpoint is prefixed by this pathPrefix. The path prefix is useful for when OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.

Whether the REST client should return any response containing at least one warning header as a failure.

Trust all SSL CA certificates.

Use this if the server is using a self signed SSL certificate.

S3 Bucket to upload logs files.

The bucket where log files are going to be imported

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

AWS region with which the SDK should communicate.

Access Key Id in order to connect to AWS.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

Default 1000

The chunk size for every bulk request.

The endpoint with which the SDK should communicate.

This property allows you to use a different S3 compatible storage backend.

Default JSON
Possible Values
IONJSON

Format of the exported files

The format of the exported files

Default kestra-log-file

Prefix of the log files

The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion

Default 100000

Maximum number of lines per file

The maximum number of lines per file

Secret Key Id in order to connect to AWS.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

The AWS STS endpoint with which the SDKClient should communicate.

AWS STS Role.

The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

AWS STS External Id.

A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.

Default PT15M
Format duration

AWS STS Session duration.

The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.

AWS STS Session name.

This property is only used when an stsRoleArn is defined.

Url of the Data Collection Endpoint

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

Id of the Data Collection Rule

Name of the stream

Default 1000

The chunk size for every bulk request.

Client ID

Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.

Client Secret

Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.

PEM Certificate

text
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.

Tenant ID

Default false

If true, allow a failed response code (response code >= 400)

SubType integer

List of response code allowed for this request

The authentification to use.

Default UTF-8

The default charset for the request.

Default true

Whether redirects should be followed automatically.

SubType string
Possible Values
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODY

The enabled log.

The proxy configuration.

The SSL request options

The timeout configuration.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

The name of the log group.

The name of the log stream

AWS region with which the SDK should communicate.

Access Key Id in order to connect to AWS.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

Default 1000

The chunk size for every bulk request.

The endpoint with which the SDK should communicate.

This property allows you to use a different S3 compatible storage backend.

Secret Key Id in order to connect to AWS.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

The AWS STS endpoint with which the SDKClient should communicate.

AWS STS Role.

The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

AWS STS External Id.

A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.

Default PT15M
Format duration

AWS STS Session duration.

The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.

AWS STS Session name.

This property is only used when an stsRoleArn is defined.

SubType string
Min items 1

List of HTTP ElasticSearch servers.

Must be an URI like https://elasticsearch.com: 9200 with scheme and port.

Basic auth configuration.

SubType string

List of HTTP headers to be send on every request.

Must be a string with key value separated with : , ex: Authorization: Token XYZ.

Sets the path's prefix for every request used by the HTTP client.

For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint. In essence, every request's endpoint is prefixed by this pathPrefix. The path prefix is useful for when ElasticSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.

Whether the REST client should return any response containing at least one warning header as a failure.

Trust all SSL CA certificates.

Use this if the server is using a self signed SSL certificate.

The token for bearer token authentication.

Splunk host

Url of the Splunk host to export logs to

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

Splunk token

Token used to authenticate to Splunk API

Default 1000

The chunk size for every bulk request.

The http client configuration

Default Kestra

Log source

The source of the logs

GCS Bucket to upload logs files.

The bucket where log files are going to be imported

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default 1000

The chunk size for every bulk request.

Default JSON
Possible Values
IONJSON

Format of the exported files

The format of the exported files

Default kestra-log-file

Prefix of the log files

The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion

Default 100000

Maximum number of lines per file

The maximum number of lines per file

The GCP project ID.

SubType string
Default ["https://www.googleapis.com/auth/cloud-platform"]

The GCP scopes to be used.

The GCP service account key.

The password for HTTP basic authentication.

The username for HTTP basic authentication.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

OTLP endpoint

Url of the OTLP endpoint to export logs to

Authentication header name

Name of the authorization header

Authentication header value

Value of the authorization header

Default 1000

The chunk size for every bulk request.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default 1000

The chunk size for every bulk request.

The GCP project ID.

SubType string
Default ["https://www.googleapis.com/auth/cloud-platform"]

The GCP scopes to be used.

The GCP service account key.

Whether to disable checking of the remote SSL certificate.

Only applies if no trust store is configured. Note: This makes the SSL connection insecure and should only be used for testing. If you are using a self-signed certificate, set up a trust store instead.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default ION
Possible Values
IONJSON

Format of the exported files

This property defines the format of the exported files.

Default kestra-log-file

Prefix of the log files

This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.

Maximum number of lines per file

This property specifies the maximum number of lines per log file.

Api key

Api key used to log in the Datadog instance

Datadog base path

Base path of the Datadog instance

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default 1000

The chunk size for every bulk request.

The http client configuration

Default LogExporter

Log sending service

Name of the service that send logs

Default Kestra

Log source

The source of the logs

Basic auth password.

Basic auth username.

Basic auth password.

Basic auth username.

Authentication key

Api key or License key used to log to the New Relic instance

New Relic base path

Base path of the new relic instance to send logs to

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default 1000

The chunk size for every bulk request.

The http client configuration

Name of the container

Name of the container in the blob storage

Url of the Blob Storage

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1
Default 1000

The chunk size for every bulk request.

Client ID

Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.

Client Secret

Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.

Connection string of the Storage Account.

Default JSON
Possible Values
IONJSON

Format of the exported files

The format of the exported files

Default kestra-log-file

Prefix of the log files

The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion

Default 100000

Maximum number of lines per file

The maximum number of lines per file

PEM Certificate

text
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

Shared Key access key for authenticating requests.

Shared Key account name for authenticating requests.

Tenant ID

The connection properties.

Validation RegExp ^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length 1

The name of the index to send logs to

Default 1000

The chunk size for every bulk request.