Produce Produce

yaml
type: "io.kestra.plugin.pulsar.Produce"

Produce message to a Pulsar topic.

Examples

Read a CSV file, transform it to the right format, and publish it to Pulsar topic.

yaml
id: produce
namespace: io.kestra.tests
inputs:
  - type: FILE
    id: file

tasks:
  - id: csvReader
    type: io.kestra.plugin.serdes.csv.CsvReader
    from: "{{ inputs.file }}"
  - id: fileTransform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csvReader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "eventTime": row.timestamp,
        "properties": {
          "key": "value"
        }
      };
      row = result
  - id: produce
    type: io.kestra.plugin.pulsar.Produce
    from: "{{ outputs.fileTransform.uri }}"
    uri: pulsar://localhost:26650
    serializer: JSON
    topic: test_kestra

Properties

from

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Source of the sent message.

Can be a Kestra internal storage URI, a map or a list in the following format: key, value, eventTime, properties, deliverAt, deliverAfter and sequenceId.

serializer

  • Type: object
  • Dynamic:
  • Required: ✔️

topic

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic to send a message to.

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6650,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651

accessMode

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • Shared
    • Exclusive
    • ExclusiveWithFencing
    • WaitForExclusive

Configure the type of access mode that the producer requires on the topic.

Possible values are:

  • Shared: By default, multiple producers can publish to a topic.
  • Exclusive: Require exclusive access for producer. Fail immediately if there's already a producer connected.
  • WaitForExclusive: Producer creation is pending until it can acquire exclusive access.

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

compressionType

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • NONE
    • LZ4
    • ZLIB
    • ZSTD
    • SNAPPY

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

  • NONE: No compression (Default).
  • LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib.
  • ZLIB: Standard ZLib compression.
  • ZSTD Compress with Zstandard codec. Since Pulsar 2.3.
  • SNAPPY Compress with Snappy codec. Since Pulsar 2.4.

encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add public encryption key, used by producer to encrypt the data key.

producerName

  • Type: string
  • Dynamic: ✔️
  • Required:

Specify a name for the producer.

producerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the producer.

schemaString

  • Type: string
  • Dynamic: ✔️
  • Required:

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

schemaType

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: NONE
  • Possible Values:
    • NONE
    • AVRO
    • JSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

tlsOptions

TLS authentication options.

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer
  • Dynamic:
  • Required:

Number of messages produced.

Definitions

io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions

Properties

ca
  • Type: string
  • Dynamic:
  • Required:

The ca certificate.

Must be a base64-encoded pem file.

cert
  • Type: string
  • Dynamic:
  • Required:

The client certificate.

Must be a base64-encoded pem file.

key
  • Type: string
  • Dynamic:
  • Required:

The key certificate.

Must be a base64-encoded pem file.

Was this page helpful?