Consume Consume

yaml
type: "io.kestra.plugin.amqp.Consume"

Consume messages from an AMQP queue.

Requires maxDuration or maxRecords.

Examples

yaml
id: amqp_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.amqp.Consume
    url: amqp://guest:guest@localhost:5672/my_vhost
    queue: kestramqp.queue
    maxRecords: 1000

Properties

consumerTag

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: Kestra

A client-generated consumer tag to establish context.

queue

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The queue to pull messages from.

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • JSON

Serializer / Deserializer used for the message.

host

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker host.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The maximum duration to wait for new rows.

It's not an hard limit and is evaluated every second.

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The maximum number of rows to fetch before stopping.

It's not an hard limit and is evaluated every second.

password

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker password.

port

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker port.

username

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker username.

virtualHost

  • Type: string
  • Dynamic: ✔️
  • Required:

The broker virtual host.

Outputs

count

  • Type: integer
  • Required:

Number of rows consumed.

uri

  • Type: string
  • Required:
  • Format: uri

File URI containing consumed messages.

Was this page helpful?