Reader
type: "io.kestra.plugin.pulsar.Reader"
Read messages from Pulsar topic(s) without subscription
# Examples
id: "reader"
type: "io.kestra.plugin.pulsar.Reader"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
# Properties
# authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
# deserializer
- Type: object
- Dynamic: ❓
- Required: ❌
# maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
# maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
# messageId
- Type: string
- Dynamic: ✔️
- Required: ❌
Position the reader on a particular message.
The first message read will be the one immediately after the specified message
If no since
or messageId
are provide, we start at the beginning of the topic.
# pollDuration
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
# since
- Type: string
- Dynamic: ✔️
- Required: ❌
- Format:
duration
The initial reader positioning can be set at specific timestamp by providing total rollback duration.
So, broker can find a latest message that was published before given duration. eg: rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and set the initial position on that messageId.
# tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
# topic
- Type: object
- Dynamic: ✔️
- Required: ❌
Pulsar topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
# uri
- Type: string
- Dynamic: ✔️
- Required: ❌
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
# Outputs
# messagesCount
- Type: integer
Number of message consumed
# uri
- Type: string
URI of a kestra internal storage file
# Definitions
# TlsOptions
# ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
# cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
# key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.