Trigger Trigger

yaml
type: "io.kestra.plugin.kafka.Trigger"

Examples

yaml
id: kafka_trigger
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.value }}"

triggers:
  - id: trigger
    type: io.kestra.plugin.kafka.Trigger
    topic: test_kestra
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
      keyDeserializer: STRING
      valueDeserializer: AVRO
    interval: PT30S
    maxRecords: 5
    groupId: kafkaConsumerGroupId

Properties

groupId

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

keyDeserializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

pollDuration

  • Type:
    • string
    • string
  • Dynamic: ✔️
  • Required: ✔️

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

valueDeserializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

Interval between polling.

The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.

maxDuration

  • Type:
    • string
    • string
  • Dynamic: ✔️
  • Required:

maxRecords

  • Type:
    • integer
    • string
  • Dynamic: ✔️
  • Required:

partitions

  • Type: array
  • SubType: integer
  • Dynamic: ✔️
  • Required:

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:
  • Default: {}

since

  • Type: string
  • Dynamic: ✔️
  • Required:

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

topic

  • Type: object
  • Dynamic:
  • Required:

topicPattern

  • Type: string
  • Dynamic: ✔️
  • Required:

Outputs

messagesCount

  • Type: integer
  • Required:

uri

  • Type: string
  • Required:
  • Format: uri

Was this page helpful?