RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.azure.eventhubs.RealtimeTrigger"

Consume a message in real-time from a Azure Event Hubs and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.azure.eventhubs.Trigger instead.

Examples

Trigger flow based on events received from Azure Event Hubs in real-time.

yaml
id: azure_eventhubs_realtime_trigger
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: Hello there! I received {{ trigger.body }} from Azure EventHubs!

triggers:
  - id: read_from_eventhub
    type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
    eventHubName: my_eventhub
    namespace: my_eventhub_namespace
    connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
    bodyDeserializer: JSON
    consumerGroup: "$Default"
    checkpointStoreProperties:
      containerName: kestra
      connectionString: "{{ secret('BLOB_CONNECTION') }}"

Properties

eventHubName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The event hub to read from.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Namespace name of the event hub to connect to.

bodyDeserializer

  • Type: string
  • Dynamic:
  • Required:
  • Default: STRING
  • Possible Values:
    • STRING
    • BINARY
    • ION
    • JSON

The Deserializer to be used for serializing the event value.

bodyDeserializerProperties

  • Type: object
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be passed to the Deserializer.

Configs in key/value pairs.

checkpointStoreProperties

  • Type: object
  • SubType: string
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be used for configuring the BlobCheckpointStore.

Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.

clientMaxRetries

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 5

The maximum number of retry attempts before considering a client operation to have failed.

clientRetryDelay

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 500

The maximum permissible delay between retry attempts in milliseconds.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

connectionString

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection string of the Storage Account.

consumerGroup

  • Type: string
  • Dynamic:
  • Required:
  • Default: $Default

The consumer group.

customEndpointAddress

  • Type: string
  • Dynamic: ✔️
  • Required:

Custom endpoint address when connecting to the Event Hubs service.

enqueueTime

  • Type: string
  • Dynamic:
  • Required:

The ISO Datetime to be used when PartitionStartingPosition is configured to INSTANT.

Configs in key/value pairs.

partitionStartingPosition

  • Type: string
  • Dynamic:
  • Required:
  • Default: EARLIEST
  • Possible Values:
    • EARLIEST
    • LATEST
    • INSTANT

The starting position.

sasToken

  • Type: string
  • Dynamic: ✔️
  • Required:

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key access key for authenticating requests.

sharedKeyAccountName

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key account name for authenticating requests.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

Outputs

body

  • Type: object
  • Required:

contentType

  • Type: string
  • Required:

correlationId

  • Type: string
  • Required:

enqueuedTimestamp

  • Type: integer
  • Required:

messageId

  • Type: string
  • Required:

offset

  • Type: integer
  • Required:

partitionKey

  • Type: string
  • Required:

properties

  • Type: object
  • Required:

sequenceNumber

  • Type: integer
  • Required:

Was this page helpful?