Consume
type: "io.kestra.plugin.nats.Consume"
Consume messages from a NATS subject on a JetStream-enabled NATS server
Please note that the server you run it against must have JetStream enabled for it to work. It should also have a stream configured to match the given subject.
Examples
Consume messages from any topic subject matching the kestra.> wildcard, using user password authentication
id: "consume"
type: "io.kestra.plugin.nats.Consume"
url: nats://localhost:4222
username: nats_user
password: nats_passwd
subject: kestra.>
durableId: someDurableId
pollDuration: PT5S
Properties
batchSize
- Type: integer
- Dynamic: ❌
- Required: ✔️
- Default:
10
- Minimum:
>= 1
Messages are fetched by batch of given size
deliverPolicy
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
All
- Possible Values:
All
Last
New
ByStartSequence
ByStartTime
LastPerSubject
The point in the stream to receive messages from. Either All, Last, New, StartSequence, StartTime, or LastPerSubject
pollDuration
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Polling duration before processing message
If no messages are available, define the max duration to wait for new messages
subject
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Subject to subscribe to
url
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
URL to connect to NATS server
The format is (nats://)server_url:port. You can also provide a connection token like so: nats://token@server_url:port
durableId
- Type: string
- Dynamic: ✔️
- Required: ❌
ID used to attach the subscription to a durable one, allowing the subscription to start back from a previous position
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration before stopping the message polling
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
password
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication password
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Minimum message timestamp to start consumption from
By default, we consume all messages from the subjects starting from beginning of logs or depending on the current durable id position. You can also provide an arbitrary start time to get all messages since this date for a new durable id. Note that if you don't provide a durable id, you will retrieve all messages starting from this date even after subsequent usage of this task.Must be a valid iso 8601 date.
username
- Type: string
- Dynamic: ✔️
- Required: ❌
Plaintext authentication username
Outputs
messagesCount
- Type: integer
Number of messages consumed
uri
- Type: string
URI of a Kestra internal storage file