ProduceProduce
ProduceCertified

Send a message to a Kafka topic.

Send a message to a Kafka topic.

Message must be passed as document with keys: key, value, topic, partition, timestamp, headers.

yaml
type: "io.kestra.plugin.kafka.Produce"

Send a string to a Kafka topic

yaml
id: kafka_producer
namespace: company.team

tasks:
  - id: kafka_producer
    type: io.kestra.plugin.kafka.Produce
    properties:
      bootstrap.servers: localhost:9092
    topic: example_topic
    from:
      key: "{{ execution.id }}"
      value: "Hello, World!"
      timestamp: "{{ execution.startDate }}"
      headers:
        x-header: some value
    keySerializer: STRING
    valueSerializer: STRING
    serdeProperties:
      schema.registry.url: http://localhost:8085

Read a CSV file, transform it and send it to Kafka.

yaml
id: send_message_to_kafka
namespace: company.team

inputs:
  - id: file
    type: FILE
    description: A CSV file with columns: id, username, tweet, and timestamp.

tasks:
  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ inputs.file }}"

  - id: ion_to_avro_schema
    type: io.kestra.plugin.graalvm.js.FileTransform
    from: "{{ outputs.csv_to_ion.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result

  - id: avro_to_kafka
    type: io.kestra.plugin.kafka.Produce
    from: "{{ outputs.ion_to_avro_schema.uri }}"
    keySerializer: STRING
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    topic: example_topic
    valueAvroSchema: |
      {"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
    valueSerializer: AVRO
Properties
SubTypestring

Kafka connection properties.

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.

Structured data items, either as a map, a list of map, a URI, or a JSON string.

Structured data items can be defined in the following ways:

  • A single item as a map (a document).
  • A list of items as a list of maps (a list of documents).
  • A URI, supported schemes are kestra for internal storage files, file for host local files, and nsfile for namespace files.
  • A JSON String that will then be serialized either as a single item or a list of items.

Avro Schema if the key is set to AVRO type.

DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON

The serializer used for the key.

SubTypestring
Default{}

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.

Kafka topic to which the message should be sent.

Could also be passed inside the from property using the key topic.

Defaulttrue

Whether the producer should be transactional.

Avro Schema if the value is set to AVRO type.

DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON

The serializer used for the value.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

Number of messages sent to a Kafka topic.

Unitrecords

Number of records sent to Kafka topic.