Consume Consume

yaml
type: "io.kestra.plugin.pulsar.Consume"

Consume messages from Pulsar topic(s)

Examples

yaml
id: "consume"
type: "io.kestra.plugin.pulsar.Consume"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow

Properties

deserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

initialPosition

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: Earliest
  • Possible Values:
    • Latest
    • Earliest

Add all the properties in the provided map to the consumer.

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

subscriptionName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The subscription name

Using subscription name, we will fetch only records not already consumed

subscriptionType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: Exclusive
  • Possible Values:
    • Exclusive
    • Shared
    • Failover
    • Key_Shared

Add all the properties in the provided map to the consumer.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs

You need to specify a Pulsar protocol URL

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651
  • ssl.truststore.location

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication Token

Authentication Token that can be necessary with some providers such as Clever Cloud!

consumerName

  • Type: string
  • Dynamic: ✔️
  • Required:

Set the consumer name.

consumerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the consumer.

encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add public encryption key, used by producer to decrypt the data key.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

tlsOptions

TLS Authentication

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer

Number of message consumed

uri

  • Type: string

URI of a kestra internal storage file

Definitions

TlsOptions

ca

  • Type: string
  • Dynamic:
  • Required:

The ca certificate

Must be a pem file as base64.

cert

  • Type: string
  • Dynamic:
  • Required:

The client certificate

Must be a pem file as base64.

key

  • Type: string
  • Dynamic:
  • Required:

The key certificate

Must be a pem file as base64.