Trigger Trigger

yaml
type: "io.kestra.plugin.kafka.Trigger"

Wait for messages on Kafka topics

Examples

yaml
id: "trigger"
type: "io.kestra.plugin.kafka.Trigger"
topic: test_kestra
properties:
  bootstrap.servers: localhost:9092
serdeProperties:
  schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO

Properties

keyDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled

If no records are available, the max wait to wait for a new records.

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Connection properties

bootstrap.servers is a minimal required configuration. Can be any valid Consumer Configs or Producer Configs

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for : - ssl.keystore.location - ssl.truststore.location

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Kafka topic(s) where to consume message

Can be a string or a List of string to consume from multiple topic

valueDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

groupId

  • Type: string
  • Dynamic: ✔️
  • Required:

The consumer group

Using consumer group, we will fetch only records not already consumed

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: PT1S
  • Format: duration

Interval between polling

The interval between 2 different test of schedule, this can avoid to overload the remote system with too many call. For most of trigger that depend on external system, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval value

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Serializer configuration

Configuration that will be passed to serializer or deserializer, you typically may need to use `` avro.use.logical.type.converters is always passed with true value.

since

  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of message to start with

By default, we consume all messages from the topics with no consumer group or depending on configuration auto.offset.reset with consumer group, but you can provide a arbitrary start time. This property is ignore if a consumer group is used. Must be a valid iso 8601 date.

Outputs

messagesCount

  • Type: integer

Number of message produced

uri

  • Type: string

URI of a kestra internal storage file