Consume
type: "io.kestra.plugin.pulsar.Consume"
Consume messages from Pulsar topic(s)
# Examples
id: "consume"
type: "io.kestra.plugin.pulsar.Consume"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow
# Properties
# authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
# consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Set the consumer name.
# consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
# deserializer
- Type: object
- Dynamic: ❓
- Required: ❌
# encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to decrypt the data key.
# initialPosition
Type: string
Dynamic: ❌
Required: ❌
Default:
Earliest
Possible Values:
Latest
Earliest
Add all the properties in the provided map to the consumer.
# maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
# maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
# pollDuration
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
# subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ❌
The subscription name
Using subscription name, we will fetch only records not already consumed
# subscriptionType
Type: string
Dynamic: ❌
Required: ❌
Default:
Exclusive
Possible Values:
Exclusive
Shared
Failover
Key_Shared
Add all the properties in the provided map to the consumer.
# tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
# topic
- Type: object
- Dynamic: ✔️
- Required: ❌
Pulsar topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
# uri
- Type: string
- Dynamic: ✔️
- Required: ❌
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
# Outputs
# messagesCount
- Type: integer
Number of message consumed
# uri
- Type: string
URI of a kestra internal storage file
# Definitions
# TlsOptions
# ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
# cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
# key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.