consume
Consume messages from one or more Kafka topics.
Consume messages from one or more Kafka topics.
Consume messages from one or more Kafka topics.
type: "io.kestra.plugin.kafka.consume"Examples
Consome data from a Kafka topic
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a Kafka cluster with SSL.
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
Consume data from a Kafka topic and write it to a JSON file
id: consume-kafka-messages
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: topic_test
properties:
bootstrap.servers: localhost:9093
auto.offset.reset: earliest
pollDuration: PT20S
maxRecords: 50
keyDeserializer: STRING
valueDeserializer: JSON
- id: write_json
type: io.kestra.plugin.serdes.json.IonToJson
newLine: true
from: "{{ outputs.consume.uri }}"
Properties
properties*Requiredobject
Kafka connection properties.
The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic.
This property can reference any valid Consumer Configs or
Producer Configs as key-value pairs.
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.
groupIdstring
Kafka consumer group ID.
Using a consumer group, we will fetch only records that haven't been consumed yet.
headerFiltersobject
Filter messages by Kafka headers
Only consume messages whose headers match these conditions
keyDeserializerstring
STRINGSTRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONThe deserializer used for the key.
maxDurationstring
durationThe maximum duration to wait for new records before stopping the consumption process.
maxRecordsintegerstring
The maximum number of records to fetch before stopping the consumption process.
It's a soft limit evaluated every second.
onSerdeErrorNon-dynamic
On serde error.
Set the behavior wanted when valueDeserializer is JSON and a serde error has occurred :
- SKIPPED : all invalid messages will be skipped
- STORE : messages will be ignored and stored as a file
- DLQ : messages will be ignored and sent to the DLQ specified in topic
io.kestra.plugin.kafka.KafkaConsumerInterface-OnSerdeError
Topic used when DLQ has been chosen.
SKIPPEDSKIPPEDDLQSTOREBehavior in case of serde error.
partitionsarray
Topic partitions to consume messages from.
Manually assign a list of partitions to the consumer.
pollDurationstring
PT5SdurationHow often to poll for a record.
If no records are available, the maximum wait duration to wait for new records.
serdePropertiesobject
{}Serializer configuration
Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.
sincestring
Timestamp of a message to start consuming messages from.
By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset property. However, you can provide an arbitrary start time.
This property is ignored if a consumer group is used.
It must be a valid ISO 8601 date.
topicNon-dynamicobject
Kafka topic(s) to consume messages from.
It can be a string or a list of strings to consume from one or multiple topics.
topicPatternstring
Kafka topic pattern to consume messages from.
Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.
valueDeserializerstring
STRINGSTRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONThe deserializer used for the value.
Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.
Outputs
messagesCountinteger
Number of messages consumed from a Kafka topic.
uristring
uriURI of a file in Kestra's internal storage containing the messages.
Metrics
recordscounter
recordsNumber of records consumed from Kafka topic.