Produce
Produce a message in a Pulsar topic.
type: "io.kestra.plugin.pulsar.Produce"
Examples
Read a CSV file, transform it to the right format, and publish it to Pulsar topic.
id: produce
namespace: company.team
inputs:
- type: FILE
id: file
tasks:
- id: csv_reader
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: file_transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: {{ outputs.csv_reader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"eventTime": row.timestamp,
"properties": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.pulsar.Produce
from: "{{ outputs.file_transform.uri }}"
uri: pulsar://localhost:26650
serializer: JSON
topic: test_kestra
Properties
from *Requiredobject
Source of the sent message.
Can be a Kestra internal storage URI, a map or a list in the following format: key
, value
, eventTime
, properties
, deliverAt
, deliverAfter
and sequenceId
.
serializer *Requiredstring
STRING
STRING
JSON
BYTES
Serializer used for the value.
topic *Requiredstring
Pulsar topic to send a message to.
uri *Requiredstring
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost: 6650
- If you have multiple brokers:
pulsar://localhost: 6650,localhost: 6651,localhost: 6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com: 6651
accessMode string
Shared
Exclusive
ExclusiveWithFencing
WaitForExclusive
Configure the type of access mode that the producer requires on the topic.
Possible values are:
Shared
: By default, multiple producers can publish to a topic.Exclusive
: Require exclusive access for producer. Fail immediately if there's already a producer connected.WaitForExclusive
: Producer creation is pending until it can acquire exclusive access.
authenticationToken string
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
compressionType string
NONE
LZ4
ZLIB
ZSTD
SNAPPY
Set the compression type for the producer.
By default, message payloads are not compressed. Supported compression types are:
NONE
: No compression (Default).LZ4
: Compress with LZ4 algorithm. Faster but lower compression than ZLib.ZLIB
: Standard ZLib compression.ZSTD
Compress with Zstandard codec. Since Pulsar 2.3.SNAPPY
Compress with Snappy codec. Since Pulsar 2.4.
encryptionKey string
Add public encryption key, used by producer to encrypt the data key.
producerName string
Specify a name for the producer.
producerProperties object
Add all the properties in the provided map to the producer.
schemaString string
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType string
NONE
NONE
AVRO
JSON
The schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
tlsOptions Non-dynamicAbstractPulsarConnection-TlsOptions
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount integer
Number of messages produced.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
ca string
The ca certificate.
Must be a base64-encoded pem file.
cert string
The client certificate.
Must be a base64-encoded pem file.
key string
The key certificate.
Must be a base64-encoded pem file.