Forward workflow execution logs to one or more desired destinations.
The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.
The task works incrementally in batches:
- Determines the starting timestamp using either:
- The last successfully processed log's timestamp (persisted in KV Store using the
offsetKey
) - Current time minus
lookbackPeriod
duration if no previous state exists
- The last successfully processed log's timestamp (persisted in KV Store using the
- Sends retrieved logs through configured
logExporters
- Stores the timestamp of the last processed log to maintain state between executions
- Subsequent runs continue from the last stored timestamp
This incremental approach ensures reliable log forwarding without gaps or duplicates.
type: "io.kestra.plugin.ee.core.log.LogShipper"
Examples
Ship logs to multiple destinations
id: logShipper
namespace: system
tasks:
- id: shipLogs
type: io.kestra.plugin.ee.core.log.LogShipper
logLevelFilter: INFO
lookbackPeriod: P1D
offsetKey: logShipperOffset
delete: false
logExporters:
- id: file
type: io.kestra.plugin.ee.core.log.FileLogExporter
- id: awsCloudWatch
type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: us-east-1
logGroupName: kestra
logStreamName: production
- id: S3LogExporter
type: io.kestra.plugin.ee.aws.s3.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "{{ vars.region }}"
format: JSON
bucket: logbucket
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
- id: googleOperationalSuite
type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
projectId: my-gcp-project
- id: gcs
type: io.kestra.plugin.ee.gcp.gcs.LogExporter
projectId: myProjectId
format: JSON
maxLinesPerFile: 10000
bucket: my-bucket
logFilePrefix: kestra-log-file
chunk: 1000
- id: azureMonitor
type: io.kestra.plugin.ee.azure.monitor.LogExporter
endpoint: https://endpoint-host.ingest.monitor.azure.com
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
streamName: kestraLogs
- id: azureBlobStorage
type: io.kestra.plugin.ee.azure.storage.LogExporter
endpoint: https://myblob.blob.core.windows.net/
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
containerName: logs
format: JSON
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
chunk: 1000
- id: datadog
type: io.kestra.plugin.ee.datadog.LogExporter
basePath: https://http-intake.logs.datadoghq.eu
apiKey: "{{ secret('DATADOG_API_KEY') }}"
- id: elasticsearch
type: io.kestra.plugin.ee.elasticsearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: opensearch
type: io.kestra.plugin.ee.opensearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: newRelic
type: io.kestra.plugin.ee.newrelic.LogExporter
basePath: https://log-api.newrelic.com
apiKey: "{{ secret('NEWRELIC_API_KEY') }}"
- id: openTelemetry
type: io.kestra.plugin.ee.opentelemetry.LogExporter
otlpEndpoint: http://otel-collector:4318/v1/logs
authorizationHeaderName: Authorization
authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"
triggers:
- id: dailySchedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"
disabled: true
Properties
logExporters *RequiredNon-dynamicarray
1
List of log shippers
The list of log shippers to use for sending logs
delete booleanstring
Delete logs after export
The log shipper will delete the exported logs
executionId string
Execution to search
The executionID to use to filter logs
flowId string
Flow to search
The flow ID to use to filter logs
logLevelFilter string
INFO
Log level to send
This property specifies the minimum log level to send.
lookbackPeriod string
P1D
duration
Starting duration before now
If no previous execution or state exists, the fetch start date is set to the current time minus this duration
namespace string
Namespace to search
The namespace to use to filter logs
offsetKey string
Prefix of the KVStore key
The prefix of the KVStore key that contains the last execution's end fetched date
Outputs
Definitions
io.kestra.core.http.client.configurations.TimeoutConfiguration
connectTimeout string
duration
The time allowed to establish a connection to the server before failing.
readIdleTimeout string
PT5M
duration
The time allowed for a read connection to remain idle before closing it.
Ship logs to Elasticsearch
connection *RequiredElasticsearchConnection
The connection properties.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
indexName *Requiredstring
The name of the index to send logs to
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
io.kestra.core.http.client.configurations.ProxyConfiguration
address string
The address of the proxy server.
password string
The password for proxy authentication.
port integerstring
The port of the proxy server.
type string
DIRECT
DIRECT
HTTP
SOCKS
The type of proxy to use.
username string
The username for proxy authentication.
io.kestra.plugin.ee.opensearch.OpensearchConnection
hosts *Requiredarray
List of HTTP OpenSearch servers.
Must be an URI like https://opensearch.com: 9200
with scheme and port.
basicAuth OpensearchConnection-BasicAuth
Basic auth configuration.
headers array
List of HTTP headers to be send on every request.
Must be a string with key value separated with :
, ex: Authorization: Token XYZ
.
pathPrefix string
Sets the path's prefix for every request used by the HTTP client.
For example, if this is set to /my/path
, then any client request will become /my/path/
+ endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix
.
The path prefix is useful for when OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
strictDeprecationMode booleanstring
Whether the REST client should return any response containing at least one warning header as a failure.
trustAllSsl booleanstring
Trust all SSL CA certificates.
Use this if the server is using a self signed SSL certificate.
Export logs to S3
bucket *Requiredstring
S3 Bucket to upload logs files.
The bucket where log files are going to be imported
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
region *Requiredstring
AWS region with which the SDK should communicate.
type *Requiredobject
accessKeyId string
Access Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
chunk integerstring
1000
The chunk size for every bulk request.
endpointOverride string
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
format string
JSON
ION
JSON
Format of the exported files
The format of the exported files
logFilePrefix string
kestra-log-file
Prefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000
Maximum number of lines per file
The maximum number of lines per file
secretKeyId string
Secret Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
sessionToken string
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsEndpointOverride string
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArn string
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider
. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalId string
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn
is defined.
stsRoleSessionDuration string
PT15M
duration
AWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn
is defined.
stsRoleSessionName string
AWS STS Session name.
This property is only used when an stsRoleArn
is defined.
Export logs to Azure Monitor
endpoint *Requiredstring
Url of the Data Collection Endpoint
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
ruleId *Requiredstring
Id of the Data Collection Rule
streamName *Requiredstring
Name of the stream
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
clientId string
Client ID
Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.
clientSecret string
Client Secret
Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.
pemCertificate string
PEM Certificate
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
tenantId string
Tenant ID
java.nio.charset.Charset
io.kestra.core.http.client.configurations.HttpConfiguration
allowFailed booleanstring
false
If true, allow a failed response code (response code >= 400)
allowedResponseCodes array
List of response code allowed for this request
auth BasicAuthConfigurationBearerAuthConfiguration
The authentification to use.
defaultCharset Charsetstring
UTF-8
The default charset for the request.
followRedirects booleanstring
true
Whether redirects should be followed automatically.
logs array
REQUEST_HEADERS
REQUEST_BODY
RESPONSE_HEADERS
RESPONSE_BODY
The enabled log.
proxy ProxyConfiguration
The proxy configuration.
ssl SslOptions
The SSL request options
timeout TimeoutConfiguration
The timeout configuration.
io.kestra.core.models.tasks.Output
Export logs to AWS CloudWatch
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
logGroupName *Requiredstring
The name of the log group.
logStreamName *Requiredstring
The name of the log stream
region *Requiredstring
AWS region with which the SDK should communicate.
type *Requiredobject
accessKeyId string
Access Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
chunk integerstring
1000
The chunk size for every bulk request.
endpointOverride string
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
secretKeyId string
Secret Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
sessionToken string
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsEndpointOverride string
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArn string
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider
. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalId string
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn
is defined.
stsRoleSessionDuration string
PT15M
duration
AWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn
is defined.
stsRoleSessionName string
AWS STS Session name.
This property is only used when an stsRoleArn
is defined.
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection
hosts *Requiredarray
1
List of HTTP ElasticSearch servers.
Must be an URI like https://elasticsearch.com: 9200
with scheme and port.
basicAuth ElasticsearchConnection-BasicAuth
Basic auth configuration.
headers array
List of HTTP headers to be send on every request.
Must be a string with key value separated with :
, ex: Authorization: Token XYZ
.
pathPrefix string
Sets the path's prefix for every request used by the HTTP client.
For example, if this is set to /my/path
, then any client request will become /my/path/
+ endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix
.
The path prefix is useful for when ElasticSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
strictDeprecationMode booleanstring
Whether the REST client should return any response containing at least one warning header as a failure.
trustAllSsl booleanstring
Trust all SSL CA certificates.
Use this if the server is using a self signed SSL certificate.
io.kestra.core.http.client.configurations.BearerAuthConfiguration
type *Requiredobject
token string
The token for bearer token authentication.
Export logs to Splunk
host *Requiredstring
Splunk host
Url of the Splunk host to export logs to
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
token *Requiredstring
Splunk token
Token used to authenticate to Splunk API
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
source string
Kestra
Log source
The source of the logs
Export logs to a Google Cloud Storage
bucket *Requiredstring
GCS Bucket to upload logs files.
The bucket where log files are going to be imported
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
format string
JSON
ION
JSON
Format of the exported files
The format of the exported files
logFilePrefix string
kestra-log-file
Prefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000
Maximum number of lines per file
The maximum number of lines per file
projectId string
The GCP project ID.
scopes array
["https://www.googleapis.com/auth/cloud-platform"]
The GCP scopes to be used.
serviceAccount string
The GCP service account key.
io.kestra.core.http.client.configurations.BasicAuthConfiguration
type *Requiredobject
password string
The password for HTTP basic authentication.
username string
The username for HTTP basic authentication.
Export logs to an Opentelemetry collector
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
otlpEndpoint *Requiredstring
OTLP endpoint
Url of the OTLP endpoint to export logs to
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
Export logs to a Google Operational Suite
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
projectId string
The GCP project ID.
scopes array
["https://www.googleapis.com/auth/cloud-platform"]
The GCP scopes to be used.
serviceAccount string
The GCP service account key.
io.kestra.core.http.client.configurations.SslOptions
insecureTrustAllCertificates booleanstring
Whether to disable checking of the remote SSL certificate.
Only applies if no trust store is configured. Note: This makes the SSL connection insecure and should only be used for testing. If you are using a self-signed certificate, set up a trust store instead.
Ship logs to a file inside Kestra's internal storage.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
format string
ION
ION
JSON
Format of the exported files
This property defines the format of the exported files.
logFilePrefix string
kestra-log-file
Prefix of the log files
This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.
maxLinesPerFile integerstring
Maximum number of lines per file
This property specifies the maximum number of lines per log file.
Ship logs to a Datadog instance.
apiKey *Requiredstring
Api key
Api key used to log in the Datadog instance
basePath *Requiredstring
Datadog base path
Base path of the Datadog instance
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
service string
LogExporter
Log sending service
Name of the service that send logs
source string
Kestra
Log source
The source of the logs
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection-BasicAuth
password string
Basic auth password.
username string
Basic auth username.
io.kestra.plugin.ee.opensearch.OpensearchConnection-BasicAuth
password string
Basic auth password.
username string
Basic auth username.
Ship logs to New Relic
apiKey *Requiredstring
Authentication key
Api key or License key used to log to the New Relic instance
basePath *Requiredstring
New Relic base path
Base path of the new relic instance to send logs to
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
Export logs to Azure Blob Storage
containerName *Requiredstring
Name of the container
Name of the container in the blob storage
endpoint *Requiredstring
Url of the Blob Storage
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.
clientId string
Client ID
Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.
clientSecret string
Client Secret
Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.
connectionString string
Connection string of the Storage Account.
format string
JSON
ION
JSON
Format of the exported files
The format of the exported files
logFilePrefix string
kestra-log-file
Prefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000
Maximum number of lines per file
The maximum number of lines per file
pemCertificate string
PEM Certificate
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
sasToken string
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
tenantId string
Tenant ID
Ship logs to Opensearch
connection *RequiredOpensearchConnection
The connection properties.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
indexName *Requiredstring
The name of the index to send logs to
type *Requiredobject
chunk integerstring
1000
The chunk size for every bulk request.