
Consume
Consume a messages from Pulsar topic(s).
type: "io.kestra.plugin.pulsar.Consume"Examples
id: pulsar_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.pulsar.Consume
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow
Properties
deserializer*Requiredstring
STRINGSTRINGJSONBYTESDeserializer used for the value.
subscriptionName*Requiredstring
The subscription name.
Using subscription name, we will fetch only records that haven't been consumed yet.
topic*Requiredobject
Pulsar topic(s) where to consume messages from.
Can be a string or a list of strings to consume from multiple topics.
uri*Requiredstring
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost: 6650 - If you have multiple brokers:
pulsar://localhost: 6650,localhost: 6651,localhost: 6652 - If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com: 6651
authenticationTokenstring
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
consumerNamestring
The consumer name.
consumerPropertiesobject
Add all the properties in the provided map to the consumer.
encryptionKeystring
Add a public encryption key to the producer/consumer.
initialPositionstring
EarliestLatestEarliestThe position of a subscription to the topic.
maxDurationstring
durationThe maximum duration waiting for new record.
maxRecordsintegerstring
The maximum number of records to fetch before stopping.
It's not a hard limit and is evaluated every second.
pollDurationstring
PT2SdurationDuration waiting for record to be polled.
If no records are available, the maximum wait to wait for a new record.
schemaStringstring
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaTypestring
NONENONEAVROJSONThe schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
subscriptionTypestring
ExclusiveExclusiveSharedFailoverKey_SharedThe subscription type.
tlsOptionsNon-dynamic
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
The ca certificate.
The client certificate.
The key certificate.
Must be a base64-encoded pem file.
Outputs
messagesCountinteger
Number of messages consumed.
uristring
uriURI of a Kestra internal storage file containing the consumed messages.