RealtimeTrigger
RealtimeTrigger
type: "io.kestra.plugin.azure.eventhubs.RealtimeTrigger"
Consume a message in real-time from a Azure Event Hubs and create one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.azure.eventhubs.Trigger instead.
Examples
Trigger flow based on events received from Azure Event Hubs in real-time.
id: azure_eventhubs_realtime_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: Hello there! I received {{ trigger.body }} from Azure EventHubs!
triggers:
- id: read_from_eventhub
type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
eventHubName: my_eventhub
namespace: my_eventhub_namespace
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: "$Default"
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
Properties
eventHubName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The event hub to read from.
namespace
- Type: string
- Dynamic: ✔️
- Required: ✔️
Namespace name of the event hub to connect to.
bodyDeserializer
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
STRING
- Possible Values:
STRING
BINARY
ION
JSON
The Deserializer to be used for serializing the event value.
bodyDeserializerProperties
- Type: object
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be passed to the Deserializer.
Configs in key/value pairs.
checkpointStoreProperties
- Type: object
- SubType: string
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be used for configuring the BlobCheckpointStore.
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
clientMaxRetries
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
5
The maximum number of retry attempts before considering a client operation to have failed.
clientRetryDelay
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
500
The maximum permissible delay between retry attempts in milliseconds.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
connectionString
- Type: string
- Dynamic: ✔️
- Required: ❌
Connection string of the Storage Account.
consumerGroup
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
$Default
The consumer group.
customEndpointAddress
- Type: string
- Dynamic: ✔️
- Required: ❌
Custom endpoint address when connecting to the Event Hubs service.
enqueueTime
- Type: string
- Dynamic: ❌
- Required: ❌
The ISO Datetime to be used when PartitionStartingPosition
is configured to INSTANT
.
Configs in key/value pairs.
partitionStartingPosition
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
EARLIEST
- Possible Values:
EARLIEST
LATEST
INSTANT
The starting position.
sasToken
- Type: string
- Dynamic: ✔️
- Required: ❌
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
sharedKeyAccountAccessKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key access key for authenticating requests.
sharedKeyAccountName
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key account name for authenticating requests.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
Outputs
body
- Type: object
- Required: ❌
contentType
- Type: string
- Required: ❌
correlationId
- Type: string
- Required: ❌
enqueuedTimestamp
- Type: integer
- Required: ❌
messageId
- Type: string
- Required: ❌
offset
- Type: integer
- Required: ❌
partitionKey
- Type: string
- Required: ❌
properties
- Type: object
- Required: ❌
sequenceNumber
- Type: integer
- Required: ❌
Was this page helpful?