
Consume
Please note that the server you run the task against must have JetStream enabled for task to work.
The server should also have a stream configured to match the given subject.
Please note that the server you run the task against must have JetStream enabled for task to work. The server should also have a stream configured to match the given subject.
Consume messages from a NATS subject on a JetStream-enabled NATS server.
Please note that the server you run the task against must have JetStream enabled for task to work. The server should also have a stream configured to match the given subject.
type: "io.kestra.plugin.nats.core.Consume"Examples
Consume messages from any topic subject matching the kestra.> wildcard, using user password authentication.
id: nats_consume_messages
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.nats.core.Consume
url: nats://localhost:4222
username: nats_user
password: nats_password
subject: kestra.>
durableId: someDurableId
pollDuration: PT5S
Properties
subject*Requiredstring
1Subject to subscribe to
url*Requiredstring
1URL to connect to NATS server
The format is (nats://)server_url: port. You can also provide a connection token like so: nats://token@server_url: port
batchSizeinteger
10>= 1Messages are fetched by batch of given size.
credsstring
Credentials files authentification
deliverPolicystring
AllAllLastNewByStartSequenceByStartTimeLastPerSubjectThe point in the stream to receive messages from.
Possible settings are:
All: The default policy. The consumer will start receiving from the earliest available message.Last: When first consuming messages, the consumer will start receiving messages with the last message added to the stream, or the last message in the stream that matches the consumer's filter subject if defined.New: When first consuming messages, the consumer will only start receiving messages that were created after the consumer was created.ByStartSequence: When first consuming messages, start at the first message having the sequence number or the next one available.ByStartTime: When first consuming messages, start with messages on or after this time. The consumer is required to specifysincewhich defines this start time.LastPerSubject: When first consuming messages, start with the latest one for each filtered subject currently in the stream.
durableIdstring
ID used to attach the subscription to a durable one, allowing the subscription to start back from a previous position
maxDurationstring
durationThe max duration before stopping the message polling
It's not an hard limit and is evaluated every second
maxRecordsintegerstring
The max number of rows to fetch before stopping
passwordstring
Plaintext authentication password
pollDurationstring
PT2SdurationPolling duration before processing message
If no messages are available, define the max duration to wait for new messages
sincestring
Minimum message timestamp to start consumption from.
By default, we consume all messages from the subjects starting from beginning of logs or depending on the current durable id position. You can also provide an arbitrary start time to get all messages since this date for a new durable id. Note that if you don't provide a durable id, you will retrieve all messages starting from this date even after subsequent usage of this task.Must be a valid iso 8601 date.
tokenstring
Token authentification
usernamestring
Plaintext authentication username
Outputs
messagesCountinteger
Number of messages consumed.
uristring
uriURI of a Kestra internal storage file.