Consume messages from one or more Kafka topics.

yaml
type: "io.kestra.plugin.kafka.Consume"

Consome data from a Kafka topic

yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: test_kestra
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    keyDeserializer: STRING
    valueDeserializer: AVRO

Connect to a Kafka cluster with SSL.

yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    properties:
      security.protocol: SSL
      bootstrap.servers: localhost:19092
      ssl.key.password: my-ssl-password
      ssl.keystore.type: PKCS12
      ssl.keystore.location: my-base64-encoded-keystore
      ssl.keystore.password: my-ssl-password
      ssl.truststore.location: my-base64-encoded-truststore
      ssl.truststore.password: my-ssl-password
    topic:
      - kestra_workerinstance
    keyDeserializer: STRING
    valueDeserializer: STRING

Consume data from a Kafka topic and write it to a JSON file

yaml
id: consume-kafka-messages
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: topic_test
    properties:
      bootstrap.servers: localhost:9093
      auto.offset.reset: earliest
    pollDuration: PT20S
    maxRecords: 50
    keyDeserializer: STRING
    valueDeserializer: JSON

  - id: write_json
    type: io.kestra.plugin.serdes.json.IonToJson
    newLine: true
    from: "{{ outputs.consume.uri }}"
Properties
SubType string

Kafka connection properties.

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.

Kafka consumer group ID.

Using a consumer group, we will fetch only records that haven't been consumed yet.

Default STRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON

The deserializer used for the key.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

Format duration

The maximum duration to wait for new records before stopping the consumption process.

It's a soft limit evaluated every second.

The maximum number of records to fetch before stopping the consumption process.

It's a soft limit evaluated every second.

On serde error.

Set the behavior wanted when valueDeserializer is JSON and a serde error has occurred : - SKIPPED : all invalid messages will be skipped - STORE : messages will be ignored and stored as a file - DLQ : messages will be ignored and sent to the DLQ specified in topic

SubType integer

Topic partitions to consume messages from.

Manually assign a list of partitions to the consumer.

Default PT5S
Format duration

How often to poll for a record.

If no records are available, the maximum wait duration to wait for new records.

SubType string
Default {}

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.

Timestamp of a message to start consuming messages from.

By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset property. However, you can provide an arbitrary start time. This property is ignored if a consumer group is used. It must be a valid ISO 8601 date.

Kafka topic(s) to consume messages from.

It can be a string or a list of strings to consume from one or multiple topics.

Kafka topic pattern to consume messages from.

Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.

Default STRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON

The deserializer used for the value.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

Number of messages consumed from a Kafka topic.

Format uri

URI of a file in Kestra's internal storage containing the messages.

Topic used when DLQ has been chosen.

Default SKIPPED
Possible Values
SKIPPEDDLQSTORE

Behavior in case of serde error.