Reader Reader

yaml
type: "io.kestra.plugin.pulsar.Reader"

Read messages from Pulsar topic(s) without subscription

Examples

yaml
id: "reader"
type: "io.kestra.plugin.pulsar.Reader"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON

Properties

deserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs

You need to specify a Pulsar protocol URL

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651
  • ssl.truststore.location

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication Token

Authentication Token that can be necessary with some providers such as Clever Cloud!

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

messageId

  • Type: string
  • Dynamic: ✔️
  • Required:

Position the reader on a particular message.

The first message read will be the one immediately after the specified message If no since or messageId are provide, we start at the beginning of the topic.

since

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Format: duration

The initial reader positioning can be set at specific timestamp by providing total rollback duration.

So, broker can find a latest message that was published before given duration. eg: rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and set the initial position on that messageId.

tlsOptions

TLS Authentication

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer

Number of message consumed

uri

  • Type: string

URI of a kestra internal storage file

Definitions

TlsOptions

ca

  • Type: string
  • Dynamic:
  • Required:

The ca certificate

Must be a pem file as base64.

cert

  • Type: string
  • Dynamic:
  • Required:

The client certificate

Must be a pem file as base64.

key

  • Type: string
  • Dynamic:
  • Required:

The key certificate

Must be a pem file as base64.