Produce a message in a Pulsar topic.

yaml
type: "io.kestra.plugin.pulsar.Produce"

Read a CSV file, transform it to the right format, and publish it to Pulsar topic.

yaml
id: produce
namespace: company.team

inputs:
  - type: FILE
    id: file

tasks:
  - id: csv_reader
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ inputs.file }}"

  - id: file_transform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: {{ outputs.csv_reader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "eventTime": row.timestamp,
        "properties": {
          "key": "value"
        }
      };
      row = result

  - id: produce
    type: io.kestra.plugin.pulsar.Produce
    from: "{{ outputs.file_transform.uri }}"
    uri: pulsar://localhost:26650
    serializer: JSON
    topic: test_kestra
Properties

Source of the sent message.

Can be a Kestra internal storage URI, a map or a list in the following format: key, value, eventTime, properties, deliverAt, deliverAfter and sequenceId.

Default STRING
Possible Values
STRINGJSONBYTES

Serializer used for the value.

Pulsar topic to send a message to.

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar://localhost: 6650
  • If you have multiple brokers: pulsar://localhost: 6650,localhost: 6651,localhost: 6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com: 6651
Possible Values
SharedExclusiveExclusiveWithFencingWaitForExclusive

Configure the type of access mode that the producer requires on the topic.

Possible values are:

  • Shared: By default, multiple producers can publish to a topic.
  • Exclusive: Require exclusive access for producer. Fail immediately if there's already a producer connected.
  • WaitForExclusive: Producer creation is pending until it can acquire exclusive access.

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

Possible Values
NONELZ4ZLIBZSTDSNAPPY

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

  • NONE: No compression (Default).
  • LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib.
  • ZLIB: Standard ZLib compression.
  • ZSTD Compress with Zstandard codec. Since Pulsar 2.3.
  • SNAPPY Compress with Snappy codec. Since Pulsar 2.4.

Add public encryption key, used by producer to encrypt the data key.

Specify a name for the producer.

SubType string

Add all the properties in the provided map to the producer.

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

Default NONE
Possible Values
NONEAVROJSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

TLS authentication options.

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Number of messages produced.

The ca certificate.

Must be a base64-encoded pem file.

The client certificate.

Must be a base64-encoded pem file.

The key certificate.

Must be a base64-encoded pem file.