Produce
type: "io.kestra.plugin.kafka.Produce"
Produce message in a Kafka topic
Examples
Read a csv, transform it to the right format, and produce it to Kafka
id: produce
namespace: io.kestra.tests
inputs:
- type: FILE
name: file
tasks:
- id: csvReader
type: io.kestra.plugin.serdes.csv.CsvReader
from: "{{ inputs.file }}"
- id: fileTransform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.csvReader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"timestamp": row.timestamp,
"headers": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.kafka.Produce
from: "{{ outputs.fileTransform.uri }}"
keySerializer: STRING
properties:
bootstrap.servers: local:9092
serdeProperties:
schema.registry.url: http://local:8085
topic: test_kestra
valueAvroSchema: |
{"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
valueSerializer: AVRO
Properties
from
- Type:stringarrayobject
- Dynamic: ✔️
- Required: ✔️
Source of message send
Can be an internal storage uri, a map or a list.with the following format: key, value, partition, timestamp, headers
keySerializer
- Type: object
- Dynamic: ❓
- Required: ✔️
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Connection properties
bootstrap.servers
is a minimal required configuration.
Can be any valid Consumer Configs or Producer Configs
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location
- ssl.truststore.location
topic
- Type: string
- Dynamic: ✔️
- Required: ✔️
Kafka topic where to send message
valueSerializer
- Type: object
- Dynamic: ❓
- Required: ✔️
keyAvroSchema
- Type: string
- Dynamic: ✔️
- Required: ❌
Avro Schema if key is
AVRO
type
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Serializer configuration
Configuration that will be passed to serializer or deserializer, you typically may need to use ``
avro.use.logical.type.converters
is always passed with true
value.
valueAvroSchema
- Type: string
- Dynamic: ✔️
- Required: ❌
Avro Schema if value is
AVRO
type
Outputs
messagesCount
- Type: integer
Number of message produced