🚀 New! Kestra raises $3 million to grow Learn more

Trigger Trigger

yaml
type: "io.kestra.plugin.kafka.Trigger"

React to and consume messages from one or more Kafka topics. Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the null variable.

Examples

yaml
id: "trigger"
type: "io.kestra.plugin.kafka.Trigger"
topic: test_kestra
properties:
  bootstrap.servers: localhost:9092
serdeProperties:
  schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO

Properties

keyDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

How often to poll for a record

If no records are available, the maximum wait duration to wait for new records.

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Connection properties

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs.

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Kafka topic(s) to consume messages from

It can be a string or a list of strings to consume from one or multiple topics.

valueDeserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of Conditions in order to limit the flow trigger.

groupId

  • Type: string
  • Dynamic: ✔️
  • Required:

The consumer group

Using a consumer group, we will fetch only records that haven't been consumed yet.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

Interval between polling

The interval between 2 different test of schedule, this can avoid to overload the remote system with too many call. For most of trigger that depend on external system, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval value

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The maximum duration to wait for new records before stopping the consumption process.

It's a soft limit evaluated every second.

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The maximum number of records to fetch before stopping the consumption process.

It's a soft limit evaluated every second.

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.

since

  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of a message to start consuming messages from.

By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset property. However, you can provide an arbitrary start time. This property is ignored if a consumer group is used. It must be a valid ISO 8601 date.

Outputs

messagesCount

  • Type: integer

Number of messages consumed from a Kafka topic

uri

  • Type: string

URI of a kestra internal storage file containing the messages

Definitions