RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.mqtt.RealtimeTrigger"

Consume a message in real-time from MQTT topics and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.mqtt.Trigger instead.

Examples

Consume a message from MQTT topics in real-time.

yaml
id: mqtt_realtime_trigger
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.payload }}"

triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.mqtt.RealtimeTrigger
    server: tcp://localhost:1883
    clientId: kestraProducer
    topic:
      - kestra/sensors/cpu
      - kestra/sensors/mem
    serdeType: JSON

Properties

qos

  • Type: integer
  • Dynamic:
  • Required: ✔️
  • Default: 1

Sets the quality of service for this message.

  • Quality of Service 0: indicates that a message should be delivered at most once (zero or one times). The message will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should only be used for messages which are not valuable - note that if the server cannot process the message (for example, there is an authorization problem). Also known as "fire and forget".
  • Quality of Service 1: indicates that a message should be delivered at least once (one or more times). The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. The message will be acknowledged across the network.
  • Quality of Service 2: indicates that a message should be delivered once. The message will be persisted to disk, and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. If persistence is not configured, QoS 1 and 2 messages will still be delivered in the event of a network or server problem as the client will hold state in memory. If the MQTT client is shutdown or fails and persistence is not configured then delivery of QoS 1 and 2 messages can not be maintained as client-side state will be lost.

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: JSON
  • Possible Values:
    • STRING
    • JSON
    • BYTES

Serializer / Deserializer used for the payload

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Topic where to consume message

Can be a string or a List of string to consume from multiple topic

version

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: V5
  • Possible Values:
    • V3
    • V5

authMethod

  • Type: string
  • Dynamic:
  • Required:

clientId

  • Type: string
  • Dynamic:
  • Required:

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

connectionTimeout

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

crt

  • Type: string
  • Dynamic:
  • Required:

httpsHostnameVerificationEnabled

  • Type: boolean
  • Dynamic:
  • Required:

password

  • Type: string
  • Dynamic:
  • Required:

server

  • Type: string
  • Dynamic:
  • Required:

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

username

  • Type: string
  • Dynamic:
  • Required:

Outputs

id

  • Type: integer
  • Required:

payload

  • Type: object
  • Required:

properties

  • Type: array
  • SubType: string
  • Required:

qos

  • Type: integer
  • Required:

retain

  • Type: boolean
  • Required:

topic

  • Type: string
  • Required:

Was this page helpful?