Consume
Read Kafka records into internal storage
Consume
Read Kafka records into internal storage
yaml
type: io.kestra.plugin.kafka.ConsumeExamples
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
yaml
id: consume_kafka_messages
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: topic_test
properties:
bootstrap.servers: localhost:9093
auto.offset.reset: earliest
pollDuration: PT20S
maxRecords: 50
keyDeserializer: STRING
valueDeserializer: JSON
- id: write_json
type: io.kestra.plugin.serdes.json.IonToJson
newLine: true
from: "{{ outputs.consume.uri }}"
yaml
id: consume_with_headers
namespace: company.team
tasks:
- id: consume_filtered
type: io.kestra.plugin.kafka.Consume
topic: orders
properties:
bootstrap.servers: localhost:9092
auto.offset.reset: earliest
keyDeserializer: STRING
valueDeserializer: JSON
headerFilters:
event-type: order_created
region: us-east
yaml
id: consume_kafka_share_group
namespace: company.team
tasks:
- id: consume_queue
type: io.kestra.plugin.kafka.Consume
topic: orders
properties:
bootstrap.servers: localhost:9092
groupId: orders-share-group
groupType: SHARE
acknowledgeType: ACCEPT
keyDeserializer: STRING
valueDeserializer: JSON
Properties
properties *Requiredobject
SubTypestring
acknowledgeType string
Default
ACCEPTPossible Values
ACCEPTRELEASEREJECTRENEWgroupId string
groupType string
Default
CONSUMERPossible Values
CONSUMERSHAREheaderFilters object
SubTypestring
keyDeserializer string
Default
STRINGPossible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONmaxDuration string
maxRecords integerstring
onSerdeError Non-dynamic
Definitions
io.kestra.plugin.kafka.KafkaConsumerInterface-OnSerdeError
topicstring
typestring
Default
SKIPPEDPossible Values
SKIPPEDDLQSTOREpartitions array
SubTypeinteger
pollDuration string
Default
PT5SserdeProperties object
SubTypestring
Default
{}since string
topic Non-dynamicobject
topicPattern string
valueDeserializer string
Default
STRINGPossible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONOutputs
messagesCount integer
uri string
Format
uriMetrics
records counter
Unit
records