Consume Consume

type: "io.kestra.plugin.kafka.Consume"

Consume messages from Kafka topic(s)

# Examples

id: "consume"
type: "io.kestra.plugin.kafka.Consume"
topic: test_kestra
properties:
  bootstrap.servers: localhost:9092
serdeProperties:
  schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO

Connect to a cluster with ssl

id: "consume"
type: "io.kestra.plugin.kafka.Consume"
properties:
  security.protocol: SSL
  bootstrap.servers: server.aivencloud.com:12835
  ssl.key.password: my-ssl-password
  ssl.keystore.type: PKCS12
  ssl.keystore.location: my-base64-encoded-keystore
  ssl.keystore.password: my-ssl-password
  ssl.truststore.location: my-base64-encoded-truststore
  ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING

# Properties

# groupId

  • Type: string
  • Dynamic: ✔️
  • Required:

The consumer group

Using consumer group, we will fetch only records not already consumed

# keyDeserializer

  • Type: object
  • Dynamic:
  • Required:

# maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

# maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

# pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

# properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Connection properties

bootstrap.servers is a minimal required configuration.
Can be any valid Consumer Configs (opens new window) or Producer Configs
(opens new window)

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location - ssl.truststore.location

# serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Serializer configuration

Configuration that will be passed to serializer or deserializer, you typically may need to use ``
avro.use.logical.type.converters is always passed with true value.

# since

  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of message to start with

By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset with consumer group, but you can provide a arbitrary start time.
This property is ignore if a consumer group is used.
Must be a valid iso 8601 date.

# topic

  • Type: object
  • Dynamic: ✔️
  • Required:

Kafka topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

# valueDeserializer

  • Type: object
  • Dynamic:
  • Required:

# Outputs

# messagesCount

  • Type: integer

Number of message produced

# uri

  • Type: string

URI of a kestra internal storage file