
Produce
Send a message to a Kafka topic.
Send a message to a Kafka topic.
Send a message to a Kafka topic.
Message must be passed as document with keys: key, value, topic, partition, timestamp, headers.
type: "io.kestra.plugin.kafka.Produce"Examples
Send a string to a Kafka topic
id: kafka_producer
namespace: company.team
tasks:
- id: kafka_producer
type: io.kestra.plugin.kafka.Produce
properties:
bootstrap.servers: localhost:9092
topic: example_topic
from:
key: "{{ execution.id }}"
value: "Hello, World!"
timestamp: "{{ execution.startDate }}"
headers:
x-header: some value
keySerializer: STRING
valueSerializer: STRING
serdeProperties:
schema.registry.url: http://localhost:8085
Read a CSV file, transform it and send it to Kafka.
id: send_message_to_kafka
namespace: company.team
inputs:
- id: file
type: FILE
description: A CSV file with columns: id, username, tweet, and timestamp.
tasks:
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: ion_to_avro_schema
type: io.kestra.plugin.graalvm.js.FileTransform
from: "{{ outputs.csv_to_ion.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"timestamp": row.timestamp,
"headers": {
"key": "value"
}
};
row = result
- id: avro_to_kafka
type: io.kestra.plugin.kafka.Produce
from: "{{ outputs.ion_to_avro_schema.uri }}"
keySerializer: STRING
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
topic: example_topic
valueAvroSchema: |
{"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
valueSerializer: AVRO
Properties
properties*Requiredobject
Kafka connection properties.
The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic.
This property can reference any valid Consumer Configs or
Producer Configs as key-value pairs.
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.
fromstringarrayobject
Structured data items, either as a map, a list of map, a URI, or a JSON string.
Structured data items can be defined in the following ways:
- A single item as a map (a document).
- A list of items as a list of maps (a list of documents).
- A URI, supported schemes are
kestrafor internal storage files,filefor host local files, andnsfilefor namespace files. - A JSON String that will then be serialized either as a single item or a list of items.
keyAvroSchemastring
Avro Schema if the key is set to AVRO type.
keySerializerstring
STRINGSTRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONThe serializer used for the key.
serdePropertiesobject
{}Serializer configuration
Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.
topicstring
Kafka topic to which the message should be sent.
Could also be passed inside the from property using the key topic.
transactionalbooleanstring
trueWhether the producer should be transactional.
valueAvroSchemastring
Avro Schema if the value is set to AVRO type.
valueSerializerstring
STRINGSTRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONThe serializer used for the value.
Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.
Outputs
messagesCountinteger
Number of messages sent to a Kafka topic.
Metrics
recordscounter
recordsNumber of records sent to Kafka topic.