type: "io.kestra.plugin.nats.Trigger"
Consume messages periodically from a NATS subject on a JetStream-enabled NATS server and create one execution per batch.
If you would like to consume each message from a NATS subject in real-time and create one execution per message, you can use the io.kestra.plugin.nats.RealtimeTrigger instead.
Examples
Subscribe to a NATS subject, getting every message from the beginning of the subject on first trigger execution.
id: nats
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.data }}"
triggers:
- id: watch
type: io.kestra.plugin.nats.Trigger
url: nats://localhost:4222
username: nats_user
password: nats_password
subject: kestra.trigger
durableId: natsTrigger
deliverPolicy: All
maxRecords: 1
Properties
batchSize
- Type: integer
- Dynamic: ❌
- Required: ✔️
- Default:
10
- Minimum:
›= 1
Messages are fetched by batch of given size.
deliverPolicy
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
All
- Possible Values:
All
Last
New
ByStartSequence
ByStartTime
LastPerSubject
The point in the stream to receive messages from.
Possible settings are:
All
: The default policy. The consumer will start receiving from the earliest available message.Last
: When first consuming messages, the consumer will start receiving messages with the last message added to the stream, or the last message in the stream that matches the consumer's filter subject if defined.New
: When first consuming messages, the consumer will only start receiving messages that were created after the consumer was created.ByStartSequence
: When first consuming messages, start at the first message having the sequence number or the next one available.ByStartTime
: When first consuming messages, start with messages on or after this time. The consumer is required to specifysince
which defines this start time.LastPerSubject
: When first consuming messages, start with the latest one for each filtered subject currently in the stream.
pollDuration
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
2
- Format:
duration
Polling duration before processing message
If no messages are available, define the max duration to wait for new messages
subject
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Subject to subscribe to
url
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
URL to connect to NATS server
The format is (nats://)server_url:port. You can also provide a connection token like so: nats://token@server_url:port
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
creds
- Type: string
- Dynamic: ✔️
- Required: ❌
Credentials files authentification
durableId
- Type: string
- Dynamic: ✔️
- Required: ❌
ID used to attach the subscription to a durable one, allowing the subscription to start back from a previous position
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
60
- Format:
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration before stopping the message polling
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
password
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication password
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Minimum message timestamp to start consumption from.
By default, we consume all messages from the subjects starting from beginning of logs or depending on the current durable id position. You can also provide an arbitrary start time to get all messages since this date for a new durable id. Note that if you don't provide a durable id, you will retrieve all messages starting from this date even after subsequent usage of this task.Must be a valid iso 8601 date.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
token
- Type: string
- Dynamic: ✔️
- Required: ❌
Token authentification
username
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication username
Outputs
messagesCount
- Type: integer
- Required: ❌
uri
- Type: string
- Required: ❌
- Format:
uri