Consume
type: "io.kestra.plugin.kafka.Consume"
Consume messages from one or more Kafka topics
Examples
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a cluster with SSL
id: "consume"
type: "io.kestra.plugin.kafka.Consume"
properties:
security.protocol: SSL
bootstrap.servers: server.aivencloud.com:12835
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
Properties
keyDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
5.000000000
- Format:
duration
How often to poll for a record
If no records are available, the maximum wait duration to wait for new records.
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Connection properties
The bootstrap.servers
property is a minimal required configuration to connect to a Kafka topic.
This property can reference any valid Consumer Configs or Producer Configs
as key-value pairs.
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location
and ssl.truststore.location
.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Kafka topic(s) to consume messages from
It can be a string or a list of strings to consume from one or multiple topics.
valueDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer group
Using a consumer group, we will fetch only records that haven't been consumed yet.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The maximum duration to wait for new records before stopping the consumption process.
It's a soft limit evaluated every second.
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The maximum number of records to fetch before stopping the consumption process.
It's a soft limit evaluated every second.
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Serializer configuration
Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters
is always passed when you have any values set to true
.
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Timestamp of a message to start consuming messages from.
By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset
property. However, you can provide an arbitrary start time.
This property is ignored if a consumer group is used.
It must be a valid ISO 8601 date.
Outputs
messagesCount
- Type: integer
Number of messages consumed from a Kafka topic
uri
- Type: string
URI of a kestra internal storage file containing the messages