Produce
Produce
yaml
type: "io.kestra.plugin.kafka.Produce"Examples
yaml
id: kafka_producer
namespace: company.team
tasks:
- id: kafka_producer
type: io.kestra.plugin.kafka.Produce
properties:
bootstrap.servers: localhost:9092
topic: example_topic
from:
key: "{{ execution.id }}"
value: "Hello, World!"
timestamp: "{{ execution.startDate }}"
headers:
x-header: some value
keySerializer: STRING
valueSerializer: STRING
serdeProperties:
schema.registry.url: http://localhost:8085
yaml
id: send_message_to_kafka
namespace: company.team
inputs:
- id: file
type: FILE
description: A CSV file with columns: id, username, tweet, and timestamp.
tasks:
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: ion_to_avro_schema
type: io.kestra.plugin.graalvm.js.FileTransform
from: "{{ outputs.csv_to_ion.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"timestamp": row.timestamp,
"headers": {
"key": "value"
}
};
row = result
- id: avro_to_kafka
type: io.kestra.plugin.kafka.Produce
from: "{{ outputs.ion_to_avro_schema.uri }}"
keySerializer: STRING
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
topic: example_topic
valueAvroSchema: |
{"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
valueSerializer: AVRO
Properties
properties *Requiredobject
SubTypestring
from stringarrayobject
keyAvroSchema string
keySerializer string
Default
STRINGPossible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONserdeProperties object
SubTypestring
Default
{}topic string
transactional booleanstring
Default
truevalueAvroSchema string
valueSerializer string
Default
STRINGPossible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONOutputs
messagesCount integer
Metrics
records counter
Unit
records