Consume Consume

type: "io.kestra.plugin.pulsar.Consume"

Consume messages from Pulsar topic(s)

# Examples

id: "consume"
type: "io.kestra.plugin.pulsar.Consume"
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow

# Properties

# authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication Token

Authentication Token that can be necessary with some providers such as Clever Cloud!

# consumerName

  • Type: string
  • Dynamic: ✔️
  • Required:

Set the consumer name.

# consumerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the consumer.

# deserializer

  • Type: object
  • Dynamic:
  • Required:

# encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add public encryption key, used by producer to decrypt the data key.

# initialPosition

  • Type: string

  • Dynamic:

  • Required:

  • Default: Earliest

  • Possible Values:

    • Latest
    • Earliest

Add all the properties in the provided map to the consumer.

# maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

# maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

# pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

# subscriptionName

  • Type: string
  • Dynamic: ✔️
  • Required:

The subscription name

Using subscription name, we will fetch only records not already consumed

# subscriptionType

  • Type: string

  • Dynamic:

  • Required:

  • Default: Exclusive

  • Possible Values:

    • Exclusive
    • Shared
    • Failover
    • Key_Shared

Add all the properties in the provided map to the consumer.

# tlsOptions

TLS Authentication

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

# topic

  • Type: object
  • Dynamic: ✔️
  • Required:

Pulsar topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

# uri

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection URLs

You need to specify a Pulsar protocol URL

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651
  • ssl.truststore.location

# Outputs

# messagesCount

  • Type: integer

Number of message consumed

# uri

  • Type: string

URI of a kestra internal storage file

# Definitions

# TlsOptions

# ca

  • Type: string
  • Dynamic:
  • Required:

The ca certificate

Must be a pem file as base64.

# cert

  • Type: string
  • Dynamic:
  • Required:

The client certificate

Must be a pem file as base64.

# key

  • Type: string
  • Dynamic:
  • Required:

The key certificate

Must be a pem file as base64.