🚀 New! Kestra raises $3 million to grow Learn more

Produce Produce

yaml
type: "io.kestra.plugin.pulsar.Produce"

Produce message in a Pulsar topic

Examples

Read a csv, transform it to right format & produce it to Pulsar

yaml
id: produce
namespace: io.kestra.tests
inputs:
  - type: FILE
    name: file

tasks:
  - id: csvReader
    type: io.kestra.plugin.serdes.csv.CsvReader
    from: "{{ inputs.file }}"
  - id: fileTransform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csvReader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "eventTime": row.timestamp,
        "properties": {
          "key": "value"
        }
      };
      row = result
  - id: produce
    type: io.kestra.plugin.pulsar.Produce
    from: "{{ outputs.fileTransform.uri }}"
    uri: pulsar://localhost:26650
    serializer: JSON
    topic: test_kestra

Properties

from

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Source of message send

Can be an internal storage uri, a map or a list.with the following format: key, value, partition, timestamp, headers

serializer

  • Type: object
  • Dynamic:
  • Required: ✔️

topic

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic where to send message

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs

You need to specify a Pulsar protocol URL

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651
  • ssl.truststore.location

accessMode

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • Shared
    • Exclusive
    • ExclusiveWithFencing
    • WaitForExclusive

Configure the type of access mode that the producer requires on the topic

Possible values are:

  • Shared: By default multiple producers can publish on a topic
  • Exclusive: Require exclusive access for producer. Fail immediately if there's already a producer connected.
  • WaitForExclusive: Producer creation is pending until it can acquire exclusive access

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication Token

Authentication Token that can be necessary with some providers such as Clever Cloud!

compressionType

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • NONE
    • LZ4
    • ZLIB
    • ZSTD
    • SNAPPY

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

  • NONE: No compression (Default)
  • LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib
  • ZLIB: Standard ZLib compression
  • ZSTD Compress with Zstandard codec. Since Pulsar 2.3.
  • SNAPPY Compress with Snappy codec. Since Pulsar 2.4.

encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add public encryption key, used by producer to encrypt the data key.

producerName

  • Type: string
  • Dynamic: ✔️
  • Required:

Specify a name for the producer.

producerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the producer.

tlsOptions

TLS Authentication

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer

Number of message produced

Definitions

TlsOptions

ca

  • Type: string
  • Dynamic:
  • Required:

The ca certificate

Must be a pem file as base64.

cert

  • Type: string
  • Dynamic:
  • Required:

The client certificate

Must be a pem file as base64.

key

  • Type: string
  • Dynamic:
  • Required:

The key certificate

Must be a pem file as base64.