Consume

Consume

Certified

Read Kafka records into internal storage

yaml
type: io.kestra.plugin.kafka.Consume
yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: test_kestra
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    keyDeserializer: STRING
    valueDeserializer: AVRO
    schemaRegistryVendor:
      type: io.kestra.plugin.kafka.registry.ConfluentSchemaRegistry
      schemaRegistryUrl: http://localhost:8085

yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    properties:
      security.protocol: SSL
      bootstrap.servers: localhost:19092
      ssl.key.password: my-ssl-password
      ssl.keystore.type: PKCS12
      ssl.keystore.location: my-base64-encoded-keystore
      ssl.keystore.password: my-ssl-password
      ssl.truststore.location: my-base64-encoded-truststore
      ssl.truststore.password: my-ssl-password
    topic:
      - kestra_workerinstance
    keyDeserializer: STRING
    valueDeserializer: STRING

yaml
id: consume_kafka_messages
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: topic_test
    properties:
      bootstrap.servers: localhost:9093
      auto.offset.reset: earliest
    pollDuration: PT20S
    maxRecords: 50
    keyDeserializer: STRING
    valueDeserializer: JSON

  - id: write_json
    type: io.kestra.plugin.serdes.json.IonToJson
    newLine: true
    from: "{{ outputs.consume.uri }}"

yaml
id: consume_with_headers
namespace: company.team

tasks:
  - id: consume_filtered
    type: io.kestra.plugin.kafka.Consume
    topic: orders
    properties:
      bootstrap.servers: localhost:9092
      auto.offset.reset: earliest
    keyDeserializer: STRING
    valueDeserializer: JSON
    headerFilters:
      event-type: order_created
      region: us-east

yaml
id: consume_kafka_share_group
namespace: company.team

tasks:
  - id: consume_queue
    type: io.kestra.plugin.kafka.Consume
    topic: orders
    properties:
      bootstrap.servers: localhost:9092
    groupId: orders-share-group
    groupType: SHARE
    acknowledgeType: ACCEPT
    keyDeserializer: STRING
    valueDeserializer: JSON
Properties
DefaultACCEPT
Possible Values
ACCEPTRELEASEREJECTRENEW
DefaultCONSUMER
Possible Values
CONSUMERSHARE
DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONPROTOBUF
Definitions
topicstring
typestring
DefaultSKIPPED
Possible Values
SKIPPEDDLQSTORE
SubTypeinteger
DefaultPT5S
Definitions
Example
yaml
id: consume-kafka-messages
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: topic_test
    properties:
      bootstrap.servers: localhost:9093
      auto.offset.reset: earliest
    pollDuration: PT20S
    maxRecords: 50
    keyDeserializer: STRING
    valueDeserializer: AVRO
    schemaRegistryVendor:
      type: io.kestra.plugin.kafka.registry.AwsGlueSchemaRegistry
      region: us-east-1
      # endpoint, accessKey, secretKey are optional when using IAM roles
      accessKey: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      secretKey: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region*Requiredstring
type*Requiredobject
accessKeystring
endpointstring
secretKeystring
Example
yaml
id: consume-kafka-messages
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: topic_test
    properties:
      bootstrap.servers: localhost:9092
    pollDuration: PT20S
    maxRecords: 50
    keyDeserializer: STRING
    valueDeserializer: AVRO
    schemaRegistryVendor:
      type: io.kestra.plugin.kafka.registry.ConfluentSchemaRegistry
      schemaRegistryUrl: http://localhost:8081
schemaRegistryUrl*Requiredstring
type*Requiredobject
Default{}
DefaultSTRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSONPROTOBUF
Formaturi
Unitrecords