RealtimeTrigger
RealtimeTrigger
type: "io.kestra.plugin.nats.RealtimeTrigger"
Consume a message in real-time from a NATS subject on a JetStream-enabled NATS server and create one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.nats.Trigger instead.
Examples
Subscribe to a NATS subject, getting every message from the beginning of the subject on first trigger execution.
id: nats
namespace: dev
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.data }}"
triggers:
- id: watch
type: io.kestra.plugin.nats.RealtimeTrigger
url: nats://localhost:4222
username: kestra
password: k3stra
subject: kestra.trigger
durableId: natsTrigger
deliverPolicy: All
Properties
batchSize
- Type: integer
- Dynamic: ❌
- Required: ✔️
- Default:
10
- Minimum:
>= 1
Messages are fetched by batch of given size
deliverPolicy
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
All
- Possible Values:
All
Last
New
ByStartSequence
ByStartTime
LastPerSubject
The point in the stream to receive messages from. Either All, Last, New, StartSequence, StartTime, or LastPerSubject
subject
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Subject to subscribe to
url
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
URL to connect to NATS server
The format is (nats://)server_url:port. You can also provide a connection token like so: nats://token@server_url:port
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
durableId
- Type: string
- Dynamic: ✔️
- Required: ❌
ID used to attach the subscription to a durable one, allowing the subscription to start back from a previous position
password
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication password
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Minimum message timestamp to start consumption from
By default, we consume all messages from the subjects starting from beginning of logs or depending on the current durable id position. You can also provide an arbitrary start time to get all messages since this date for a new durable id. Note that if you don't provide a durable id, you will retrieve all messages starting from this date even after subsequent usage of this task.Must be a valid iso 8601 date.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
username
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication username
Outputs
data
- Type: string
- Dynamic: ❓
- Required: ❌
headers
- Type: object
- SubType: array
- Dynamic: ❓
- Required: ❌
subject
- Type: string
- Dynamic: ❓
- Required: ❌
timestamp
- Type: string
- Dynamic: ❓
- Required: ❌
- Format:
date-time
Definitions
Was this page helpful?