Trigger
type: "io.kestra.plugin.pulsar.Trigger"
Wait for messages on Pulsar topics
Examples
id: "trigger"
type: "io.kestra.plugin.pulsar.Trigger"
interval: PT10S
topic: tu_trigger
uri: pulsar://localhost:26650
deserializer: JSON
subscriptionName: tu_trigger_sub
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
initialPosition
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Earliest
- Possible Values:
Latest
Earliest
Add all the properties in the provided map to the consumer.
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The subscription name
Using subscription name, we will fetch only records not already consumed
subscriptionType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Exclusive
- Possible Values:
Exclusive
Shared
Failover
Key_Shared
Add all the properties in the provided map to the consumer.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of Conditions in order to limit the flow trigger.
consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Set the consumer name.
consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to decrypt the data key.
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
60.000000000
- Format:
duration
Interval between polling
The interval between 2 different test of schedule, this can avoid to overload the remote system with too many call. For most of trigger that depend on external system, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval value
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
Number of message consumed
uri
- Type: string
URI of a kestra internal storage file
Definitions
TlsOptions
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.