Consume
type: "io.kestra.plugin.kafka.Consume"
Consume messages from Kafka topic(s)
# Examples
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a cluster with ssl
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
properties:
security.protocol: SSL
bootstrap.servers: server.aivencloud.com:12835
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
# Properties
# groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer group
Using consumer group, we will fetch only records not already consumed
# keyDeserializer
- Type: object
- Dynamic: ❓
- Required: ❌
# maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
# maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
# pollDuration
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
# properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Connection properties
bootstrap.servers
is a minimal required configuration.
Can be any valid Consumer Configs (opens new window) or Producer Configs
(opens new window)
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location
- ssl.truststore.location
# serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Serializer configuration
Configuration that will be passed to serializer or deserializer, you typically may need to use ``
avro.use.logical.type.converters
is always passed with true
value.
# since
- Type: string
- Dynamic: ✔️
- Required: ❌
Timestamp of message to start with
By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset
with consumer group, but you can provide a arbitrary start time.
This property is ignore if a consumer group is used.
Must be a valid iso 8601 date.
# topic
- Type: object
- Dynamic: ✔️
- Required: ❌
Kafka topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
# valueDeserializer
- Type: object
- Dynamic: ❓
- Required: ❌
# Outputs
# messagesCount
- Type: integer
Number of message produced
# uri
- Type: string
URI of a kestra internal storage file