Trigger
type: "io.kestra.plugin.azure.eventhubs.Trigger"
Consume messages periodically from Azure Event Hubs and create one execution per batch.
If you would like to consume each message from Azure Event Hubs in real-time and create one execution per message, you can use the io.kestra.plugin.azure.eventhubs.RealtimeTrigger instead.
Examples
Trigger flow based on events received from Azure Event Hubs in batch.
id: azure_eventhubs_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: Hello there! I received {{ trigger.eventsCount }} from Azure EventHubs!
triggers:
- id: read_from_eventhub
type: io.kestra.plugin.azure.eventhubs.Trigger
interval: PT30S
eventHubName: my_eventhub
namespace: my_eventhub_namespace
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: "$Default"
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
Properties
eventHubName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The event hub to read from.
namespace
- Type: string
- Dynamic: ✔️
- Required: ✔️
Namespace name of the event hub to connect to.
bodyDeserializer
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
STRING
- Possible Values:
STRING
BINARY
ION
JSON
The Deserializer to be used for serializing the event value.
bodyDeserializerProperties
- Type: object
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be passed to the Deserializer.
Configs in key/value pairs.
checkpointStoreProperties
- Type: object
- SubType: string
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be used for configuring the BlobCheckpointStore.
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
clientMaxRetries
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
5
The maximum number of retry attempts before considering a client operation to have failed.
clientRetryDelay
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
500
The maximum permissible delay between retry attempts in milliseconds.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
connectionString
- Type: string
- Dynamic: ✔️
- Required: ❌
Connection string of the Storage Account.
consumerGroup
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
$Default
The consumer group.
customEndpointAddress
- Type: string
- Dynamic: ✔️
- Required: ❌
Custom endpoint address when connecting to the Event Hubs service.
enqueueTime
- Type: string
- Dynamic: ❌
- Required: ❌
The ISO Datetime to be used when PartitionStartingPosition
is configured to INSTANT
.
Configs in key/value pairs.
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
60.000000000
- Format:
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
maxBatchSizePerPartition
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
50
The maximum number of events to consume per event hub partition per poll.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
10.000000000
- Format:
duration
The max time duration to wait to receive events from all partitions.
maxWaitTimePerPartition
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
5.000000000
- Format:
duration
The max time duration to wait to receive a batch of events up to the maxBatchSizePerPartition
.
partitionStartingPosition
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
EARLIEST
- Possible Values:
EARLIEST
LATEST
INSTANT
The starting position.
sasToken
- Type: string
- Dynamic: ✔️
- Required: ❌
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
sharedKeyAccountAccessKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key access key for authenticating requests.
sharedKeyAccountName
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key account name for authenticating requests.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
Outputs
eventsCount
- Type: integer
- Required: ❌
Number of events consumed from Azure Event Hubs.
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a kestra internal storage file containing the messages.
Was this page helpful?