LogShipperLogShipper
LogShipperCertified

Forward workflow execution logs to one or more desired destinations.

Forward workflow execution logs to one or more desired destinations.

The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.

The task works incrementally in batches:

  1. Determines the starting timestamp using either:
    • The last successfully processed log's timestamp (persisted in KV Store using the offsetKey)
    • Current time minus lookbackPeriod duration if no previous state exists
  2. Sends retrieved logs through configured logExporters
  3. Stores the timestamp of the last processed log to maintain state between executions
  4. Subsequent runs continue from the last stored timestamp

This incremental approach ensures reliable log forwarding without gaps or duplicates.

yaml
type: "io.kestra.plugin.ee.core.log.LogShipper"

Ship logs to multiple destinations

yaml
id: logShipper
namespace: system

tasks:
  - id: shipLogs
    type: io.kestra.plugin.ee.core.log.LogShipper
    logLevelFilter: INFO
    lookbackPeriod: P1D
    offsetKey: logShipperOffset
    delete: false
    logExporters:
      - id: file
        type: io.kestra.plugin.ee.core.log.FileLogExporter

      - id: awsCloudWatch
        type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: us-east-1
        logGroupName: kestra
        logStreamName: production

      - id: S3LogExporter
        type: io.kestra.plugin.ee.aws.s3.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: "{{ vars.region }}"
        format: JSON
        bucket: logbucket
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000

      - id: googleOperationalSuite
        type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
        projectId: my-gcp-project

      - id: gcs
        type: io.kestra.plugin.ee.gcp.gcs.LogExporter
        projectId: myProjectId
        format: JSON
        maxLinesPerFile: 10000
        bucket: my-bucket
        logFilePrefix: kestra-log-file
        chunk: 1000

      - id: azureMonitor
        type: io.kestra.plugin.ee.azure.monitor.LogExporter
        endpoint: https://endpoint-host.ingest.monitor.azure.com
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
        streamName: kestraLogs

      - id: azureBlobStorage
        type: io.kestra.plugin.ee.azure.storage.LogExporter
        endpoint: https://myblob.blob.core.windows.net/
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        containerName: logs
        format: JSON
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000
        chunk: 1000

      - id: datadog
        type: io.kestra.plugin.ee.datadog.LogExporter
        basePath: https://http-intake.logs.datadoghq.eu
        apiKey: "{{ secret('DATADOG_API_KEY') }}"

      - id: elasticsearch
        type: io.kestra.plugin.ee.elasticsearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200

      - id: opensearch
        type: io.kestra.plugin.ee.opensearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200

      - id: newRelic
        type: io.kestra.plugin.ee.newrelic.LogExporter
        basePath: https://log-api.newrelic.com
        apiKey: "{{ secret('NEWRELIC_API_KEY') }}"

      - id: openTelemetry
        type: io.kestra.plugin.ee.opentelemetry.LogExporter
        otlpEndpoint: http://otel-collector:4318/v1/logs
        authorizationHeaderName: Authorization
        authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"

triggers:
  - id: dailySchedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"
    disabled: true
Properties
Min items1

List of log shippers

The list of log shippers to use for sending logs

Definitions
id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
formatstring
DefaultION
Possible Values
IONJSON

This property defines the format of the exported files.

logFilePrefixstring
Defaultkestra-log-file

This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.

maxLinesPerFileintegerstring

This property specifies the maximum number of lines per log file.

connection*Required
hosts*Requiredarray
SubTypestring

List of HTTP OpenSearch servers.

Must be an URI like https://opensearch.com: 9200 with scheme and port.

basicAuth
passwordstring
usernamestring
headersarray
SubTypestring
pathPrefixstring

For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint. In essence, every request's endpoint is prefixed by this pathPrefix. The path prefix is useful for when OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.

strictDeprecationModebooleanstring
trustAllSslbooleanstring
id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
indexName*Requiredstring
type*Requiredobject
chunkintegerstring
Default1000
endpoint*Requiredstring

Url of the Data Collection Endpoint

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
ruleId*Requiredstring

Id of the Data Collection Rule

streamName*Requiredstring

Name of the stream

type*Requiredobject
chunkintegerstring
Default1000
clientIdstring
clientSecretstring
pemCertificatestring
tenantIdstring
containerName*Requiredstring

Name of the container

Name of the container in the blob storage

endpoint*Requiredstring

Url of the Blob Storage

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
chunkintegerstring
Default1000
clientIdstring

Client ID

Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.

clientSecretstring

Client Secret

Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.

connectionStringstring

Connection string of the Storage Account.

formatstring
DefaultJSON
Possible Values
IONJSON
logFilePrefixstring
Defaultkestra-log-file
maxLinesPerFileintegerstring
Default100000
pemCertificatestring

PEM Certificate

text
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
sasTokenstring

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKeystring

Shared Key access key for authenticating requests.

sharedKeyAccountNamestring

Shared Key account name for authenticating requests.

tenantIdstring

Tenant ID

apiKey*Requiredstring

Authentication key

Api key or License key used to log to the New Relic instance

basePath*Requiredstring

New Relic base path

Base path of the new relic instance to send logs to

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
chunkintegerstring
Default1000
options
allowFailedbooleanstring
Defaultfalse
allowedResponseCodesarray
SubTypeinteger
auth
type*Requiredobject
passwordstring
usernamestring
type*Requiredobject
tokenstring
basicAuthPasswordDeprecatedstring
basicAuthUserDeprecatedstring
connectTimeoutDeprecatedstring
Formatduration
connectionPoolIdleTimeoutDeprecatedstring
Formatduration
defaultCharsetstring
DefaultUTF-8
followRedirectsbooleanstring
Defaulttrue
logLevelDeprecatedstring
Possible Values
ALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIED
logsarray
SubTypestring
Possible Values
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODY
maxContentLengthDeprecatedinteger
proxy
addressstring
passwordstring
portintegerstring
typestring
DefaultDIRECT
Possible Values
DIRECTHTTPSOCKS
usernamestring
proxyAddressDeprecatedstring
proxyPasswordDeprecatedstring
proxyPortDeprecatedinteger
proxyTypeDeprecatedstring
Possible Values
DIRECTHTTPSOCKS
proxyUsernameDeprecatedstring
readIdleTimeoutDeprecatedstring
Formatduration
readTimeoutDeprecatedstring
Formatduration
ssl
insecureTrustAllCertificatesbooleanstring
timeout
connectTimeoutstring
Formatduration
readIdleTimeoutstring
DefaultPT5M
Formatduration
id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
otlpEndpoint*Requiredstring

OTLP endpoint

Url of the OTLP endpoint to export logs to

type*Requiredobject
authorizationHeaderNamestring

Authentication header name

Name of the authorization header

authorizationHeaderValuestring

Authentication header value

Value of the authorization header

chunkintegerstring
Default1000
bucket*Requiredstring

GCS Bucket to upload logs files.

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
chunkintegerstring
Default1000
formatstring
DefaultJSON
Possible Values
IONJSON
impersonatedServiceAccountstring
logFilePrefixstring
Defaultkestra-log-file
maxLinesPerFileintegerstring
Default100000
projectIdstring
scopesarray
SubTypestring
Default["https://www.googleapis.com/auth/cloud-platform"]
serviceAccountstring
id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
chunkintegerstring
Default1000
impersonatedServiceAccountstring

The GCP service account to impersonate.

projectIdstring

The GCP project ID.

scopesarray
SubTypestring
Default["https://www.googleapis.com/auth/cloud-platform"]

The GCP scopes to be used.

serviceAccountstring

The GCP service account key.

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
logGroupName*Requiredstring

The name of the log group.

logStreamName*Requiredstring

The name of the log stream

region*Requiredstring
type*Requiredobject
accessKeyIdstring
chunkintegerstring
Default1000
endpointOverridestring
secretKeyIdstring
sessionTokenstring
stsEndpointOverridestring
stsRoleArnstring
stsRoleExternalIdstring
stsRoleSessionDurationstring
DefaultPT15M
Formatduration
stsRoleSessionNamestring
bucket*Requiredstring

S3 Bucket to upload logs files.

The bucket where log files are going to be imported

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
region*Requiredstring

AWS region with which the SDK should communicate.

type*Requiredobject
accessKeyIdstring

Access Key Id in order to connect to AWS.

chunkintegerstring
Default1000
endpointOverridestring

The endpoint with which the SDK should communicate.

This property allows you to use a different S3 compatible storage backend.

formatstring
DefaultJSON
Possible Values
IONJSON

Format of the exported files

The format of the exported files

logFilePrefixstring
Defaultkestra-log-file

Prefix of the log files

The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion

maxLinesPerFileintegerstring
Default100000

Maximum number of lines per file

The maximum number of lines per file

secretKeyIdstring

Secret Key Id in order to connect to AWS.

sessionTokenstring

AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.

If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

stsEndpointOverridestring

The AWS STS endpoint with which the SDKClient should communicate.

stsRoleArnstring

AWS STS Role.

The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.

stsRoleExternalIdstring

AWS STS External Id.

A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.

stsRoleSessionDurationstring
DefaultPT15M
Formatduration

AWS STS Session duration.

The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.

stsRoleSessionNamestring

AWS STS Session name.

This property is only used when an stsRoleArn is defined.

host*Requiredstring

Splunk host

Url of the Splunk host to export logs to

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
token*Requiredstring

Splunk token

Token used to authenticate to Splunk API

type*Requiredobject
chunkintegerstring
Default1000
options
allowFailedbooleanstring
Defaultfalse
allowedResponseCodesarray
SubTypeinteger
auth
type*Requiredobject
passwordstring
usernamestring
type*Requiredobject
tokenstring
basicAuthPasswordDeprecatedstring
basicAuthUserDeprecatedstring
connectTimeoutDeprecatedstring
Formatduration
connectionPoolIdleTimeoutDeprecatedstring
Formatduration
defaultCharsetstring
DefaultUTF-8
followRedirectsbooleanstring
Defaulttrue
logLevelDeprecatedstring
Possible Values
ALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIED
logsarray
SubTypestring
Possible Values
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODY
maxContentLengthDeprecatedinteger
proxy
addressstring
passwordstring
portintegerstring
typestring
DefaultDIRECT
Possible Values
DIRECTHTTPSOCKS
usernamestring
proxyAddressDeprecatedstring
proxyPasswordDeprecatedstring
proxyPortDeprecatedinteger
proxyTypeDeprecatedstring
Possible Values
DIRECTHTTPSOCKS
proxyUsernameDeprecatedstring
readIdleTimeoutDeprecatedstring
Formatduration
readTimeoutDeprecatedstring
Formatduration
ssl
insecureTrustAllCertificatesbooleanstring
timeout
connectTimeoutstring
Formatduration
readIdleTimeoutstring
DefaultPT5M
Formatduration
sourcestring
DefaultKestra
apiKey*Requiredstring

Api key

Api key used to log in the Datadog instance

basePath*Requiredstring

Datadog base path

Base path of the Datadog instance

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
type*Requiredobject
chunkintegerstring
Default1000
options

The http client configuration

allowFailedbooleanstring
Defaultfalse

If true, allow a failed response code (response code >= 400)

allowedResponseCodesarray
SubTypeinteger

List of response code allowed for this request

auth

The authentification to use.

type*Requiredobject
passwordstring

The password for HTTP basic authentication.

usernamestring

The username for HTTP basic authentication.

type*Requiredobject
tokenstring

The token for bearer token authentication.

basicAuthPasswordDeprecatedstring

The password for HTTP basic authentication. Deprecated, use auth property with a BasicAuthConfiguration instance instead.

basicAuthUserDeprecatedstring

The username for HTTP basic authentication. Deprecated, use auth property with a BasicAuthConfiguration instance instead.

connectTimeoutDeprecatedstring
Formatduration
connectionPoolIdleTimeoutDeprecatedstring
Formatduration

The time an idle connection can remain in the client's connection pool before being closed.

defaultCharsetstring
DefaultUTF-8

The default charset for the request.

followRedirectsbooleanstring
Defaulttrue

Whether redirects should be followed automatically.

logLevelDeprecatedstring
Possible Values
ALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIED

The log level for the HTTP client.

logsarray
SubTypestring
Possible Values
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODY

The enabled log.

maxContentLengthDeprecatedinteger

The maximum content length of the response.

proxy

The proxy configuration.

addressstring
passwordstring
portintegerstring
typestring
DefaultDIRECT
Possible Values
DIRECTHTTPSOCKS
usernamestring
proxyAddressDeprecatedstring

The address of the proxy server.

proxyPasswordDeprecatedstring

The password for proxy authentication.

proxyPortDeprecatedinteger

The port of the proxy server.

proxyTypeDeprecatedstring
Possible Values
DIRECTHTTPSOCKS

The type of proxy to use.

proxyUsernameDeprecatedstring

The username for proxy authentication.

readIdleTimeoutDeprecatedstring
Formatduration
readTimeoutDeprecatedstring
Formatduration

The maximum time allowed for reading data from the server before failing.

ssl

The SSL request options

insecureTrustAllCertificatesbooleanstring

Whether to disable checking of the remote SSL certificate.

Only applies if no trust store is configured. Note: This makes the SSL connection insecure and should only be used for testing. If you are using a self-signed certificate, set up a trust store instead.

timeout

The timeout configuration.

connectTimeoutstring
Formatduration

The time allowed to establish a connection to the server before failing.

readIdleTimeoutstring
DefaultPT5M
Formatduration

The time allowed for a read connection to remain idle before closing it.

servicestring
DefaultLogExporter

Log sending service

Name of the service that send logs

sourcestring
DefaultKestra

Log source

The source of the logs

connection*Required

The connection properties.

hosts*Requiredarray
SubTypestring
Min items1

List of HTTP ElasticSearch servers.

Must be an URI like https://elasticsearch.com: 9200 with scheme and port.

basicAuth

Basic auth configuration.

passwordstring

Basic auth password.

usernamestring

Basic auth username.

headersarray
SubTypestring

List of HTTP headers to be send on every request.

Must be a string with key value separated with : , ex: Authorization: Token XYZ.

pathPrefixstring

Sets the path's prefix for every request used by the HTTP client.

For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint. In essence, every request's endpoint is prefixed by this pathPrefix. The path prefix is useful for when ElasticSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.

strictDeprecationModebooleanstring

Whether the REST client should return any response containing at least one warning header as a failure.

trustAllSslbooleanstring

Trust all SSL CA certificates.

Use this if the server is using a self signed SSL certificate.

id*Requiredstring
Validation RegExp^[a-zA-Z0-9][a-zA-Z0-9_-]*
Min length1
indexName*Requiredstring

The name of the index to send logs to

type*Requiredobject
chunkintegerstring
Default1000

The chunk size for every bulk request.

Delete logs after export

The log shipper will delete the exported logs

Execution to search

The execution ID to use to filter logs

Flow to search

The flow ID to use to filter logs

DefaultINFO

Log level to send

This property specifies the minimum log level to send.

DefaultP1D
Formatduration

Starting duration before now

If no previous execution or state exists, the fetch start date is set to the current time minus this duration

Maximum size of messages in logs

Set the maximum size of the message inside logs (in number of characters). If set, it will truncate messages that are longer than the maximum message size.

Namespace to search

The namespace to use to filter logs

Prefix of the KVStore key

The prefix of the KVStore key that contains the last execution's end fetched date

Formatdate-time

The zoned date-time of the last fetched log, used as the starting date for the next execution

The outputs generated by each log shipper

Definitions

The number of logs fetched.