yaml
type: "io.kestra.plugin.amqp.Trigger"

Consume messages periodically from a AMQP queue and create one execution per batch.

Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the {{ trigger.uri }} variable. If you would like to consume each message from a AMQP queue in real-time and create one execution per message, you can use the io.kestra.plugin.amqp.RealtimeTrigger instead.

Examples

yaml
id: amqp_trigger
namespace: company.team

tasks:
  - id: trigger
    type: io.kestra.plugin.amqp.Trigger
    url: amqp://guest:guest@localhost:5672/my_vhost
    maxRecords: 2
    queue: amqpTrigger.queue

Properties

consumerTag

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: Kestra

A client-generated consumer tag to establish context.

queue

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The queue to pull messages from.

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • JSON

Serializer / Deserializer used for the message.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

host

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker host.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60
  • Format: duration

Interval between polling.

The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The maximum duration to wait for new rows.

It's not an hard limit and is evaluated every second.

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The maximum number of rows to fetch before stopping.

It's not an hard limit and is evaluated every second.

password

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker password.

port

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker port.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

username

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker username.

virtualHost

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker virtual host.

Outputs

count

  • Type: integer
  • Required:

uri

  • Type: string
  • Required:
  • Format: uri