Source
yaml
id: consume-kafka-messages
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: topic_test
properties:
bootstrap.servers: localhost:9093
auto.offset.reset: earliest
pollDuration: PT20S
maxRecords: 50
keyDeserializer: STRING
valueDeserializer: JSON
- id: write_json
type: io.kestra.plugin.serdes.json.IonToJson
newLine: true
from: "{{ outputs.consume.uri }}"
About this blueprint
Queue Outputs
This flow consumes data from a Kafka topic and writes it to a JSON file.