Consume Consume

yaml
type: "io.kestra.plugin.kafka.Consume"

Consume messages from Kafka topic(s)

Examples

yaml
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
topic: test_kestra
properties:
  bootstrap.servers: localhost:9092
serdeProperties:
  schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO

Connect to a cluster with ssl

yaml
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
properties:
  security.protocol: SSL
  bootstrap.servers: server.aivencloud.com:12835
  ssl.key.password: my-ssl-password
  ssl.keystore.type: PKCS12
  ssl.keystore.location: my-base64-encoded-keystore
  ssl.keystore.password: my-ssl-password
  ssl.truststore.location: my-base64-encoded-truststore
  ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING

Properties

keyDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Connection properties

bootstrap.servers is a minimal required configuration. Can be any valid Consumer Configs or Producer Configs

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location - ssl.truststore.location

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Kafka topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

valueDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

groupId

  • Type: string
  • Dynamic: ✔️
  • Required:

The consumer group

Using consumer group, we will fetch only records not already consumed

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Serializer configuration

Configuration that will be passed to serializer or deserializer, you typically may need to use `` avro.use.logical.type.converters is always passed with true value.

since

  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of message to start with

By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset with consumer group, but you can provide a arbitrary start time. This property is ignore if a consumer group is used. Must be a valid iso 8601 date.

Outputs

messagesCount

  • Type: integer

Number of message produced

uri

  • Type: string

URI of a kestra internal storage file