Produce
type: "io.kestra.plugin.pulsar.Produce"
Produce message in a Pulsar topic
Examples
Read a csv, transform it to right format & produce it to Pulsar
id: produce
namespace: io.kestra.tests
inputs:
- type: FILE
name: file
tasks:
- id: csvReader
type: io.kestra.plugin.serdes.csv.CsvReader
from: "{{ inputs.file }}"
- id: fileTransform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.csvReader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"eventTime": row.timestamp,
"properties": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.pulsar.Produce
from: "{{ outputs.fileTransform.uri }}"
uri: pulsar://localhost:26650
serializer: JSON
topic: test_kestra
Properties
from
- Type: object
- Dynamic: ✔️
- Required: ✔️
Source of message send
Can be an internal storage uri, a map or a list.with the following format: key, value, partition, timestamp, headers
serializer
- Type: object
- Dynamic: ❓
- Required: ✔️
topic
- Type: string
- Dynamic: ✔️
- Required: ✔️
Pulsar topic where to send message
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs
You need to specify a Pulsar protocol URL
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6550,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
ssl.truststore.location
accessMode
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
Shared
Exclusive
ExclusiveWithFencing
WaitForExclusive
Configure the type of access mode that the producer requires on the topic
Possible values are:
Shared
: By default multiple producers can publish on a topicExclusive
: Require exclusive access for producer. Fail immediately if there's already a producer connected.WaitForExclusive
: Producer creation is pending until it can acquire exclusive access
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication Token
Authentication Token that can be necessary with some providers such as Clever Cloud!
compressionType
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
NONE
LZ4
ZLIB
ZSTD
SNAPPY
Set the compression type for the producer.
By default, message payloads are not compressed. Supported compression types are:
NONE
: No compression (Default)LZ4
: Compress with LZ4 algorithm. Faster but lower compression than ZLibZLIB
: Standard ZLib compressionZSTD
Compress with Zstandard codec. Since Pulsar 2.3.SNAPPY
Compress with Snappy codec. Since Pulsar 2.4.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to encrypt the data key.
producerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Specify a name for the producer.
producerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the producer.
tlsOptions
- Type: TlsOptions
- Dynamic: ❌
- Required: ❌
TLS Authentication
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
Number of message produced
Definitions
TlsOptions
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate
Must be a pem file as base64.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate
Must be a pem file as base64.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate
Must be a pem file as base64.