Produce Produce

yaml
type: "io.kestra.plugin.kafka.Produce"

Produce message in a Kafka topic

Examples

Read a csv, transform it to the right format, and produce it to Kafka

yaml
id: produce
namespace: io.kestra.tests
inputs:
  - type: FILE
    name: file

tasks:
  - id: csvReader
    type: io.kestra.plugin.serdes.csv.CsvReader
    from: "{{ inputs.file }}"
  - id: fileTransform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csvReader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result
  - id: produce
    type: io.kestra.plugin.kafka.Produce
    from: "{{ outputs.fileTransform.uri }}"
    keySerializer: STRING
    properties:
      bootstrap.servers: local:9092
    serdeProperties:
      schema.registry.url: http://local:8085
    topic: test_kestra
    valueAvroSchema: |
      {"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
    valueSerializer: AVRO

Properties

from

  • Type:stringarrayobject
  • Dynamic: ✔️
  • Required: ✔️

Source of message send

Can be an internal storage uri, a map or a list.with the following format: key, value, partition, timestamp, headers

keySerializer

  • Type: object
  • Dynamic:
  • Required: ✔️

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Connection properties

bootstrap.servers is a minimal required configuration. Can be any valid Consumer Configs or Producer Configs

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location - ssl.truststore.location

topic

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka topic where to send message

valueSerializer

  • Type: object
  • Dynamic:
  • Required: ✔️

keyAvroSchema

  • Type: string
  • Dynamic: ✔️
  • Required:

Avro Schema if key is AVRO type

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Serializer configuration

Configuration that will be passed to serializer or deserializer, you typically may need to use `` avro.use.logical.type.converters is always passed with true value.

valueAvroSchema

  • Type: string
  • Dynamic: ✔️
  • Required:

Avro Schema if value is AVRO type

Outputs

messagesCount

  • Type: integer

Number of message produced