Produce
Produce
type: "io.kestra.plugin.pulsar.Produce"
Produce message to a Pulsar topic.
Examples
Read a CSV file, transform it to the right format, and publish it to Pulsar topic.
id: produce
namespace: io.kestra.tests
inputs:
- type: FILE
id: file
tasks:
- id: csvReader
type: io.kestra.plugin.serdes.csv.CsvReader
from: "{{ inputs.file }}"
- id: fileTransform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.csvReader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"eventTime": row.timestamp,
"properties": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.pulsar.Produce
from: "{{ outputs.fileTransform.uri }}"
uri: pulsar://localhost:26650
serializer: JSON
topic: test_kestra
Properties
from
- Type: object
- Dynamic: ✔️
- Required: ✔️
Source of the sent message. Can be a Kestra internal storage URI, a map or a list in the following format:
key
,value
,eventTime
,properties
,deliverAt
,deliverAfter
andsequenceId
.
serializer
- Type: object
- Dynamic: ❓
- Required: ✔️
topic
- Type: string
- Dynamic: ✔️
- Required: ✔️
Pulsar topic to send a message to.
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs. You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6650,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
accessMode
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
Shared
Exclusive
ExclusiveWithFencing
WaitForExclusive
Configure the type of access mode that the producer requires on the topic. Possible values are:
Shared
: By default, multiple producers can publish to a topic.Exclusive
: Require exclusive access for producer. Fail immediately if there's already a producer connected.WaitForExclusive
: Producer creation is pending until it can acquire exclusive access.
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication token. Authentication token that can be required by some providers such as Clever Cloud.
compressionType
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
NONE
LZ4
ZLIB
ZSTD
SNAPPY
Set the compression type for the producer. By default, message payloads are not compressed. Supported compression types are:
NONE
: No compression (Default).LZ4
: Compress with LZ4 algorithm. Faster but lower compression than ZLib.ZLIB
: Standard ZLib compression.ZSTD
Compress with Zstandard codec. Since Pulsar 2.3.SNAPPY
Compress with Snappy codec. Since Pulsar 2.4.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to encrypt the data key.
producerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Specify a name for the producer.
producerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the producer.
schemaString
- Type: string
- Dynamic: ✔️
- Required: ❌
JSON string of the topic's schema Required for connecting with topics with a defined schema and strict schema checking
schemaType
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
NONE
- Possible Values:
NONE
AVRO
JSON
The schema type of the topic Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
tlsOptions
- Type: io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
- Dynamic: ❌
- Required: ❌
TLS authentication options. You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
- Dynamic: ❓
- Required: ❌
Number of messages produced.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
Properties
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate. Must be a base64-encoded pem file.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate. Must be a base64-encoded pem file.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate. Must be a base64-encoded pem file.