Consume a messages from Pulsar topic(s).

yaml
type: "io.kestra.plugin.pulsar.Consume"
yaml
id: pulsar_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.pulsar.Consume
    uri: pulsar://localhost:26650
    topic: test_kestra
    deserializer: JSON
    subscriptionName: kestra_flow
Properties
Default STRING
Possible Values
STRINGJSONBYTES

Deserializer used for the value.

The subscription name.

Using subscription name, we will fetch only records that haven't been consumed yet.

Pulsar topic(s) where to consume messages from.

Can be a string or a list of strings to consume from multiple topics.

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar://localhost: 6650
  • If you have multiple brokers: pulsar://localhost: 6650,localhost: 6651,localhost: 6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com: 6651

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

The consumer name.

SubType string

Add all the properties in the provided map to the consumer.

Add a public encryption key to the producer/consumer.

Default Earliest
Possible Values
LatestEarliest

The position of a subscription to the topic.

Format duration

The maximum duration waiting for new record.

It's not a hard limit and is evaluated every second.

The maximum number of records to fetch before stopping.

It's not a hard limit and is evaluated every second.

Default PT2S
Format duration

Duration waiting for record to be polled.

If no records are available, the maximum wait to wait for a new record.

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

Default NONE
Possible Values
NONEAVROJSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

Default Exclusive
Possible Values
ExclusiveSharedFailoverKey_Shared

The subscription type.

TLS authentication options.

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Number of messages consumed.

Format uri

URI of a Kestra internal storage file containing the consumed messages.

The ca certificate.

Must be a base64-encoded pem file.

The client certificate.

Must be a base64-encoded pem file.

The key certificate.

Must be a base64-encoded pem file.