Consume
type: "io.kestra.plugin.pulsar.Consume"
Consume messages from Pulsar topic(s)
Examples
id: "consume"
type: "io.kestra.plugin.pulsar.Consume"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
initialPosition
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Earliest
- Possible Values:
Latest
Earliest
Add all the properties in the provided map to the consumer.
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The subscription name
Using subscription name, we will fetch only records not already consumed
subscriptionType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Exclusive
- Possible Values:
Exclusive
Shared
Failover
Key_Shared
Add all the properties in the provided map to the consumer.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Set the consumer name.
consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to decrypt the data key.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
Number of message consumed
uri
- Type: string
URI of a kestra internal storage file
Definitions
TlsOptions
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.