Consume
Consume
type: "io.kestra.plugin.kafka.Consume"
Consume messages from one or more Kafka topics.
Examples
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a Kafka cluster with SSL.
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
Properties
keyDeserializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The deserializer used for the key.
Possible values are:
STRING
,INTEGER
,FLOAT
,DOUBLE
,LONG
,SHORT
,BYTE_ARRAY
,BYTE_BUFFER
,BYTES
,UUID
,VOID
,AVRO
,JSON
.
pollDuration
- Type:
- string
- string
- Dynamic: ✔️
- Required: ✔️
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Kafka connection properties.
The
bootstrap.servers
property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string forssl.keystore.location
andssl.truststore.location
.
valueDeserializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The deserializer used for the value.
Possible values are:
STRING
,INTEGER
,FLOAT
,DOUBLE
,LONG
,SHORT
,BYTE_ARRAY
,BYTE_BUFFER
,BYTES
,UUID
,VOID
,AVRO
,JSON
.
groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
Kafka consumer group ID.
Using a consumer group, we will fetch only records that haven't been consumed yet.
maxDuration
- Type:
- string
- string
- Dynamic: ✔️
- Required: ❌
maxRecords
- Type:
- integer
- string
- Dynamic: ✔️
- Required: ❌
partitions
- Type: array
- SubType: integer
- Dynamic: ✔️
- Required: ❌
Topic partitions to consume messages from.
Manually assign a list of partitions to the consumer.
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
- Default:
{}
Serializer configuration
Configuration that will be passed to serializer or deserializer. The
avro.use.logical.type.converters
is always passed when you have any values set totrue
.
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Timestamp of a message to start consuming messages from.
By default, we consume all messages from the topics with no consumer group or depending on the configuration of the
auto.offset.reset
property. However, you can provide an arbitrary start time. This property is ignored if a consumer group is used. It must be a valid ISO 8601 date.
topic
- Type: object
- Dynamic: ❓
- Required: ❌
Kafka topic(s) to consume messages from.
It can be a string or a list of strings to consume from one or multiple topics.
topicPattern
- Type: string
- Dynamic: ✔️
- Required: ❌
Kafka topic pattern to consume messages from.
Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.
Outputs
messagesCount
- Type: integer
- Required: ❌
Number of messages consumed from a Kafka topic.
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a file in Kestra's internal storage containing the messages.
Was this page helpful?