Trigger
type: "io.kestra.plugin.kafka.Trigger"
Wait for messages on Kafka topics
Examples
id: "trigger"
type: "io.kestra.plugin.kafka.Trigger"
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
Properties
keyDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled
If no records are available, the max wait to wait for a new records.
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Connection properties
bootstrap.servers
is a minimal required configuration.
Can be any valid Consumer Configs or Producer Configs
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location
- ssl.truststore.location
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Kafka topic(s) where to consume message
Can be a string or a List of string to consume from multiple topic
valueDeserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
groupId
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer group
Using consumer group, we will fetch only records not already consumed
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
PT1S
- Format:
duration
Interval between polling
The interval between 2 different test of schedule, this can avoid to overload the remote system with too many call. For most of trigger that depend on external system, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval value
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Serializer configuration
Configuration that will be passed to serializer or deserializer, you typically may need to use ``
avro.use.logical.type.converters
is always passed with true
value.
since
- Type: string
- Dynamic: ✔️
- Required: ❌
Timestamp of message to start with
By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset
with consumer group, but you can provide a arbitrary start time.
This property is ignore if a consumer group is used.
Must be a valid iso 8601 date.
Outputs
messagesCount
- Type: integer
Number of message produced
uri
- Type: string
URI of a kestra internal storage file