Consume
type: "io.kestra.plugin.kafka.Consume"
Consume messages from Kafka topic(s)
Examples
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a cluster with ssl
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
properties:
security.protocol: SSL
bootstrap.servers: server.aivencloud.com:12835
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
Properties
keyDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Connection properties
bootstrap.servers
is a minimal required configuration.
Can be any valid Consumer Configs or Producer Configs
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location
- ssl.truststore.location
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Kafka topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
valueDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer group
Using consumer group, we will fetch only records not already consumed
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Serializer configuration
Configuration that will be passed to serializer or deserializer, you typically may need to use ``
avro.use.logical.type.converters
is always passed with true
value.
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Timestamp of message to start with
By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset
with consumer group, but you can provide a arbitrary start time.
This property is ignore if a consumer group is used.
Must be a valid iso 8601 date.
Outputs
messagesCount
- Type: integer
Number of message produced
uri
- Type: string
URI of a kestra internal storage file