Consume Consume

yaml
type: "io.kestra.plugin.azure.eventhubs.Consume"

Consume events from Azure Event Hubs.

Examples

Consume data events from Azure EventHubs.

yaml
id: azure_eventhubs_consume_data_events
namespace: company.team

tasks:
  - id: consume_from_eventhub
    type: io.kestra.plugin.azure.eventhubs.Consume
    eventHubName: my_eventhub
    namespace: my_eventhub_namespace
    connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
    bodyDeserializer: JSON
    consumerGroup: "$Default"
    checkpointStoreProperties:
      containerName: kestra
      connectionString: "{{ secret('BLOB_CONNECTION') }}"

Properties

eventHubName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The event hub to read from.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Namespace name of the event hub to connect to.

bodyDeserializer

  • Type: string
  • Dynamic:
  • Required:
  • Default: STRING
  • Possible Values:
    • STRING
    • BINARY
    • ION
    • JSON

The Deserializer to be used for serializing the event value.

bodyDeserializerProperties

  • Type: object
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be passed to the Deserializer.

Configs in key/value pairs.

checkpointStoreProperties

  • Type: object
  • SubType: string
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be used for configuring the BlobCheckpointStore.

Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.

clientMaxRetries

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 5

The maximum number of retry attempts before considering a client operation to have failed.

clientRetryDelay

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 500

The maximum permissible delay between retry attempts in milliseconds.

connectionString

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection string of the Storage Account.

consumerGroup

  • Type: string
  • Dynamic:
  • Required:
  • Default: $Default

The consumer group.

customEndpointAddress

  • Type: string
  • Dynamic: ✔️
  • Required:

Custom endpoint address when connecting to the Event Hubs service.

enqueueTime

  • Type: string
  • Dynamic:
  • Required:

The ISO Datetime to be used when PartitionStartingPosition is configured to INSTANT.

Configs in key/value pairs.

maxBatchSizePerPartition

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 50

The maximum number of events to consume per event hub partition per poll.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Default: 10.000000000
  • Format: duration

The max time duration to wait to receive events from all partitions.

maxWaitTimePerPartition

  • Type: string
  • Dynamic:
  • Required:
  • Default: 5.000000000
  • Format: duration

The max time duration to wait to receive a batch of events up to the maxBatchSizePerPartition.

partitionStartingPosition

  • Type: string
  • Dynamic:
  • Required:
  • Default: EARLIEST
  • Possible Values:
    • EARLIEST
    • LATEST
    • INSTANT

The starting position.

sasToken

  • Type: string
  • Dynamic: ✔️
  • Required:

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key access key for authenticating requests.

sharedKeyAccountName

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key account name for authenticating requests.

Outputs

eventsCount

  • Type: integer
  • Required:

Number of events consumed from Azure Event Hubs.

uri

  • Type: string
  • Required:
  • Format: uri

URI of a kestra internal storage file containing the messages.

Was this page helpful?