RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.amqp.RealtimeTrigger"

Consume a message in real-time from an AMQP queue and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.amqp.Trigger instead.

Examples

Consume a message from a AMQP queue in real-time.

yaml
id: amqp
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.data }}"

triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.amqp.RealtimeTrigger
    url: amqp://guest:guest@localhost:5672/my_vhost
    queue: amqpTrigger.queue

Properties

consumerTag

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: Kestra

A client-generated consumer tag to establish context.

queue

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The queue to pull messages from.

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • JSON

Serializer / Deserializer used for the message.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

host

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker host.

password

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker password.

port

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker port.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

username

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker username.

virtualHost

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker virtual host.

Outputs

appId

  • Type: string
  • Required:

contentEncoding

  • Type: string
  • Required:

contentType

  • Type: string
  • Required:

correlationId

  • Type: string
  • Required:

data

  • Type: object
  • Required:

deliveryMode

  • Type: integer
  • Required:

expiration

  • Type: string
  • Required:
  • Format: duration

headers

  • Type: object
  • Required:

messageId

  • Type: string
  • Required:

priority

  • Type: integer
  • Required:

replyTo

  • Type: string
  • Required:

timestamp

  • Type: string
  • Required:
  • Format: date-time

userId

  • Type: string
  • Required:

Was this page helpful?