Subscribe
type: "io.kestra.plugin.mqtt.Subscribe"
Subscribe message in a MQTT topic
Examples
id: mqtt_subscribe
namespace: company.team
tasks:
- id: subscribe
type: io.kestra.plugin.mqtt.Subscribe
server: tcp://localhost:1883
clientId: kestraProducer
topic:
- kestra/sensors/cpu
- kestra/sensors/mem
serdeType: JSON
maxRecords: 10
id: mqtt_subscribe
namespace: company.team
tasks:
- id: subscribe
type: io.kestra.plugin.mqtt.Subscribe
server: ssl://localhost:8883
clientId: kestraProducer
topic:
- kestra/sensors/cpu
- kestra/sensors/mem
crt: /home/path/to/ca.crt
serdeType: JSON
maxRecords: 10
Properties
clientId
- Type: string
- Dynamic: ✔️
- Required: ✔️
A client identifier that is unique on the server being connected to
A client identifier clientId must be specified and be less that 65535 characters. It must be unique across all clients connecting to the same server. The clientId is used by the server to store data related to the client, hence it is important that the clientId remain the same when connecting to a server if durable subscriptions or reliable messaging are required. As the client identifier is used by the server to identify a client when it reconnects, the client must use the same identifier between connections if durable subscriptions or reliable delivery of messages is required.
qos
- Type: integer
- Dynamic: ❌
- Required: ✔️
- Default:
1
Sets the quality of service for this message.
- Quality of Service 0: indicates that a message should be delivered at most once (zero or one times). The message will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should only be used for messages which are not valuable - note that if the server cannot process the message (for example, there is an authorization problem). Also known as "fire and forget".
- Quality of Service 1: indicates that a message should be delivered at least once (one or more times). The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. The message will be acknowledged across the network.
- Quality of Service 2: indicates that a message should be delivered once. The message will be persisted to disk, and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. If persistence is not configured, QoS 1 and 2 messages will still be delivered in the event of a network or server problem as the client will hold state in memory. If the MQTT client is shutdown or fails and persistence is not configured then delivery of QoS 1 and 2 messages can not be maintained as client-side state will be lost.
serdeType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
JSON
- Possible Values:
STRING
JSON
BYTES
Serializer / Deserializer used for the payload
server
- Type: string
- Dynamic: ✔️
- Required: ✔️
The address of the server to connect to, specified as a URI
The serverURI parameter is typically used with the the clientId parameter to form a key. The key is used to store and reference messages while they are being delivered. The address of the server to connect to is specified as a URI. Two types of connection are supported
tcp://
for a TCP connection andssl://
for a TCP connection secured by SSL/TLS. For example:
tcp://localhost:1883
ssl://localhost:8883
If the port is not specified, it will default to 1883 fortcp://
" URIs, and 8883 forssl://
URIs.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Topic where to consume message
Can be a string or a List of string to consume from multiple topic
version
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
V5
- Possible Values:
V3
V5
The MQTT version to use.
authMethod
- Type: string
- Dynamic: ✔️
- Required: ❌
The Authentication Method.
Only available if
version
=V5
If set, this value contains the name of the authentication method to be used for extended authentication. If null, extended authentication is not performed.
connectionTimeout
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The connection timeout.
This value defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.
crt
- Type: string
- Dynamic: ✔️
- Required: ❌
Server certificate file path.
httpsHostnameVerificationEnabled
- Type: boolean
- Dynamic: ❌
- Required: ❌
Disable ssl verification.
This value will allow all ca certificate.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
password
- Type: string
- Dynamic: ✔️
- Required: ❌
The password to use for the connection.
username
- Type: string
- Dynamic: ✔️
- Required: ❌
The user name to use for the connection.
Outputs
messagesCount
- Type: integer
- Required: ❌
Number of message produced
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a kestra internal storage file
Was this page helpful?