
LogShipper
Forward workflow execution logs to one or more desired destinations.
Forward workflow execution logs to one or more desired destinations.
Forward workflow execution logs to one or more desired destinations.
The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.
The task works incrementally in batches:
- Determines the starting timestamp using either:
- The last successfully processed log's timestamp (persisted in KV Store using the
offsetKey) - Current time minus
lookbackPeriodduration if no previous state exists
- The last successfully processed log's timestamp (persisted in KV Store using the
- Sends retrieved logs through configured
logExporters - Stores the timestamp of the last processed log to maintain state between executions
- Subsequent runs continue from the last stored timestamp
This incremental approach ensures reliable log forwarding without gaps or duplicates.
type: "io.kestra.plugin.ee.core.log.LogShipper"Examples
Ship logs to multiple destinations
id: logShipper
namespace: system
tasks:
- id: shipLogs
type: io.kestra.plugin.ee.core.log.LogShipper
logLevelFilter: INFO
lookbackPeriod: P1D
offsetKey: logShipperOffset
delete: false
logExporters:
- id: file
type: io.kestra.plugin.ee.core.log.FileLogExporter
- id: awsCloudWatch
type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: us-east-1
logGroupName: kestra
logStreamName: production
- id: S3LogExporter
type: io.kestra.plugin.ee.aws.s3.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "{{ vars.region }}"
format: JSON
bucket: logbucket
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
- id: googleOperationalSuite
type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
projectId: my-gcp-project
- id: gcs
type: io.kestra.plugin.ee.gcp.gcs.LogExporter
projectId: myProjectId
format: JSON
maxLinesPerFile: 10000
bucket: my-bucket
logFilePrefix: kestra-log-file
chunk: 1000
- id: azureMonitor
type: io.kestra.plugin.ee.azure.monitor.LogExporter
endpoint: https://endpoint-host.ingest.monitor.azure.com
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
streamName: kestraLogs
- id: azureBlobStorage
type: io.kestra.plugin.ee.azure.storage.LogExporter
endpoint: https://myblob.blob.core.windows.net/
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
containerName: logs
format: JSON
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
chunk: 1000
- id: datadog
type: io.kestra.plugin.ee.datadog.LogExporter
basePath: https://http-intake.logs.datadoghq.eu
apiKey: "{{ secret('DATADOG_API_KEY') }}"
- id: elasticsearch
type: io.kestra.plugin.ee.elasticsearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: opensearch
type: io.kestra.plugin.ee.opensearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: newRelic
type: io.kestra.plugin.ee.newrelic.LogExporter
basePath: https://log-api.newrelic.com
apiKey: "{{ secret('NEWRELIC_API_KEY') }}"
- id: openTelemetry
type: io.kestra.plugin.ee.opentelemetry.LogExporter
otlpEndpoint: http://otel-collector:4318/v1/logs
authorizationHeaderName: Authorization
authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"
triggers:
- id: dailySchedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"
disabled: true
Properties
logExporters*RequiredNon-dynamic
1List of log shippers
The list of log shippers to use for sending logs
Ship logs to a file inside Kestra's internal storage.
^[a-zA-Z0-9][a-zA-Z0-9_-]*1IONIONJSONThis property defines the format of the exported files.
kestra-log-fileThis property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.
This property specifies the maximum number of lines per log file.
Ship logs to Opensearch
io.kestra.plugin.ee.opensearch.OpensearchConnection
List of HTTP OpenSearch servers.
Must be an URI like https://opensearch.com: 9200 with scheme and port.
io.kestra.plugin.ee.opensearch.OpensearchConnection-BasicAuth
For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix.
The path prefix is useful for when OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000Export logs to Azure Monitor
Url of the Data Collection Endpoint
^[a-zA-Z0-9][a-zA-Z0-9_-]*1Id of the Data Collection Rule
Name of the stream
1000Export logs to Azure Blob Storage
Name of the container
Name of the container in the blob storage
Url of the Blob Storage
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000Client ID
Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.
Client Secret
Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.
Connection string of the Storage Account.
JSONIONJSONkestra-log-file100000PEM Certificate
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
Shared Key access key for authenticating requests.
Shared Key account name for authenticating requests.
Tenant ID
Ship logs to New Relic
Authentication key
Api key or License key used to log to the New Relic instance
New Relic base path
Base path of the new relic instance to send logs to
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000io.kestra.core.http.client.configurations.HttpConfiguration
falseio.kestra.core.http.client.configurations.BasicAuthConfiguration
io.kestra.core.http.client.configurations.BearerAuthConfiguration
durationdurationUTF-8java.nio.charset.Charset
trueALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIEDREQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODYio.kestra.core.http.client.configurations.ProxyConfiguration
DIRECTDIRECTHTTPSOCKSDIRECTHTTPSOCKSdurationdurationio.kestra.core.http.client.configurations.SslOptions
io.kestra.core.http.client.configurations.TimeoutConfiguration
durationPT5MdurationExport logs to an Opentelemetry collector
^[a-zA-Z0-9][a-zA-Z0-9_-]*1OTLP endpoint
Url of the OTLP endpoint to export logs to
Authentication header name
Name of the authorization header
Authentication header value
Value of the authorization header
1000Export logs to a Google Cloud Storage
GCS Bucket to upload logs files.
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000JSONIONJSONkestra-log-file100000["https://www.googleapis.com/auth/cloud-platform"]Export logs to a Google Operational Suite
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000The GCP service account to impersonate.
The GCP project ID.
["https://www.googleapis.com/auth/cloud-platform"]The GCP scopes to be used.
The GCP service account key.
Export logs to AWS CloudWatch
^[a-zA-Z0-9][a-zA-Z0-9_-]*1The name of the log group.
The name of the log stream
1000PT15MdurationExport logs to S3
S3 Bucket to upload logs files.
The bucket where log files are going to be imported
^[a-zA-Z0-9][a-zA-Z0-9_-]*1AWS region with which the SDK should communicate.
Access Key Id in order to connect to AWS.
1000The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
JSONIONJSONFormat of the exported files
The format of the exported files
kestra-log-filePrefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
100000Maximum number of lines per file
The maximum number of lines per file
Secret Key Id in order to connect to AWS.
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
The AWS STS endpoint with which the SDKClient should communicate.
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.
PT15MdurationAWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.
AWS STS Session name.
This property is only used when an stsRoleArn is defined.
Export logs to Splunk
Splunk host
Url of the Splunk host to export logs to
^[a-zA-Z0-9][a-zA-Z0-9_-]*1Splunk token
Token used to authenticate to Splunk API
1000io.kestra.core.http.client.configurations.HttpConfiguration
falseio.kestra.core.http.client.configurations.BasicAuthConfiguration
io.kestra.core.http.client.configurations.BearerAuthConfiguration
durationdurationUTF-8java.nio.charset.Charset
trueALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIEDREQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODYio.kestra.core.http.client.configurations.ProxyConfiguration
DIRECTDIRECTHTTPSOCKSDIRECTHTTPSOCKSdurationdurationio.kestra.core.http.client.configurations.SslOptions
io.kestra.core.http.client.configurations.TimeoutConfiguration
durationPT5MdurationKestraShip logs to a Datadog instance.
Api key
Api key used to log in the Datadog instance
Datadog base path
Base path of the Datadog instance
^[a-zA-Z0-9][a-zA-Z0-9_-]*11000The http client configuration
io.kestra.core.http.client.configurations.HttpConfiguration
falseIf true, allow a failed response code (response code >= 400)
List of response code allowed for this request
The authentification to use.
io.kestra.core.http.client.configurations.BasicAuthConfiguration
The password for HTTP basic authentication.
The username for HTTP basic authentication.
io.kestra.core.http.client.configurations.BearerAuthConfiguration
The token for bearer token authentication.
The password for HTTP basic authentication. Deprecated, use auth property with a BasicAuthConfiguration instance instead.
The username for HTTP basic authentication. Deprecated, use auth property with a BasicAuthConfiguration instance instead.
durationdurationThe time an idle connection can remain in the client's connection pool before being closed.
UTF-8The default charset for the request.
java.nio.charset.Charset
trueWhether redirects should be followed automatically.
ALLTRACEDEBUGINFOWARNERROROFFNOT_SPECIFIEDThe log level for the HTTP client.
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODYThe enabled log.
The maximum content length of the response.
The proxy configuration.
io.kestra.core.http.client.configurations.ProxyConfiguration
DIRECTDIRECTHTTPSOCKSThe address of the proxy server.
The password for proxy authentication.
The port of the proxy server.
DIRECTHTTPSOCKSThe type of proxy to use.
The username for proxy authentication.
durationdurationThe maximum time allowed for reading data from the server before failing.
The SSL request options
io.kestra.core.http.client.configurations.SslOptions
Whether to disable checking of the remote SSL certificate.
Only applies if no trust store is configured. Note: This makes the SSL connection insecure and should only be used for testing. If you are using a self-signed certificate, set up a trust store instead.
The timeout configuration.
io.kestra.core.http.client.configurations.TimeoutConfiguration
durationThe time allowed to establish a connection to the server before failing.
PT5MdurationThe time allowed for a read connection to remain idle before closing it.
LogExporterLog sending service
Name of the service that send logs
KestraLog source
The source of the logs
Ship logs to Elasticsearch
The connection properties.
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection
1List of HTTP ElasticSearch servers.
Must be an URI like https://elasticsearch.com: 9200 with scheme and port.
Basic auth configuration.
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection-BasicAuth
Basic auth password.
Basic auth username.
List of HTTP headers to be send on every request.
Must be a string with key value separated with : , ex: Authorization: Token XYZ.
Sets the path's prefix for every request used by the HTTP client.
For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix.
The path prefix is useful for when ElasticSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
Whether the REST client should return any response containing at least one warning header as a failure.
Trust all SSL CA certificates.
Use this if the server is using a self signed SSL certificate.
^[a-zA-Z0-9][a-zA-Z0-9_-]*1The name of the index to send logs to
1000The chunk size for every bulk request.
deletebooleanstring
Delete logs after export
The log shipper will delete the exported logs
executionIdstring
Execution to search
The execution ID to use to filter logs
flowIdstring
Flow to search
The flow ID to use to filter logs
logLevelFilterstring
INFOLog level to send
This property specifies the minimum log level to send.
lookbackPeriodstring
P1DdurationStarting duration before now
If no previous execution or state exists, the fetch start date is set to the current time minus this duration
maximumMessageSizeintegerstring
Maximum size of messages in logs
Set the maximum size of the message inside logs (in number of characters). If set, it will truncate messages that are longer than the maximum message size.
namespacestring
Namespace to search
The namespace to use to filter logs
offsetKeystring
Prefix of the KVStore key
The prefix of the KVStore key that contains the last execution's end fetched date
Outputs
endFetchedDatestring
date-timeThe zoned date-time of the last fetched log, used as the starting date for the next execution
outputsobject
The outputs generated by each log shipper
io.kestra.core.models.tasks.Output
sizeinteger
The number of logs fetched.