Consume
Consume
yaml
type: "io.kestra.plugin.kafka.Consume"
Examples
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Connect to a Kafka cluster with SSL.
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
Properties
keyDeserializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
pollDuration
- Type:
- string
- string
- Dynamic: ✔️
- Required: ✔️
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
valueDeserializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
maxDuration
- Type:
- string
- string
- Dynamic: ✔️
- Required: ❌
maxRecords
- Type:
- integer
- string
- Dynamic: ✔️
- Required: ❌
partitions
- Type: array
- SubType: integer
- Dynamic: ✔️
- Required: ❌
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
- Default:
{}
since
- Type: string
- Dynamic: ✔️
- Required: ❌
topic
- Type: object
- Dynamic: ❓
- Required: ❌
topicPattern
- Type: string
- Dynamic: ✔️
- Required: ❌
Outputs
messagesCount
- Type: integer
- Required: ❌
uri
- Type: string
- Required: ❌
- Format:
uri
Was this page helpful?