Produce Produce
Produce Certified

yaml
type: "io.kestra.plugin.kafka.Produce"
yaml
id: kafka_producer
namespace: company.team

tasks:
  - id: kafka_producer
    type: io.kestra.plugin.kafka.Produce
    properties:
      bootstrap.servers: localhost:9092
    topic: example_topic
    from:
      key: "{{ execution.id }}"
      value: "Hello, World!"
      timestamp: "{{ execution.startDate }}"
      headers:
        x-header: some value
    keySerializer: STRING
    valueSerializer: STRING
    serdeProperties:
      schema.registry.url: http://localhost:8085

yaml
id: send_message_to_kafka
namespace: company.team

inputs:
  - id: file
    type: FILE
    description: A CSV file with columns: id, username, tweet, and timestamp.

tasks:
  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ inputs.file }}"

  - id: ion_to_avro_schema
    type: io.kestra.plugin.graalvm.js.FileTransform
    from: "{{ outputs.csv_to_ion.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result

  - id: avro_to_kafka
    type: io.kestra.plugin.kafka.Produce
    from: "{{ outputs.ion_to_avro_schema.uri }}"
    keySerializer: STRING
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    topic: example_topic
    valueAvroSchema: |
      {"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
    valueSerializer: AVRO
Properties
SubTypestring
DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON
SubTypestring
Default{}
Defaulttrue
DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON
Unitrecords