Reader
type: "io.kestra.plugin.pulsar.Reader"
Read messages from Pulsar topic(s) without subscription
Examples
id: "reader"
type: "io.kestra.plugin.pulsar.Reader"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
messageId
- Type: string
- Dynamic: ✔️
- Required: ❌
Position the reader on a particular message.
The first message read will be the one immediately after the specified message
If no since
or messageId
are provide, we start at the beginning of the topic.
since
- Type: string
- Dynamic: ✔️
- Required: ❌
- Format:
duration
The initial reader positioning can be set at specific timestamp by providing total rollback duration.
So, broker can find a latest message that was published before given duration. eg: rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and set the initial position on that messageId.
tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
Number of message consumed
uri
- Type: string
URI of a kestra internal storage file
Definitions
TlsOptions
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.