Consume
type: "io.kestra.plugin.azure.eventhubs.Consume"
Consume events from Azure Event Hubs.
Examples
Consume data events from Azure EventHubs.
id: azure_eventhubs_consume_data_events
namespace: company.team
tasks:
- id: consume_from_eventhub
type: io.kestra.plugin.azure.eventhubs.Consume
eventHubName: my_eventhub
namespace: my_eventhub_namespace
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: "$Default"
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
Properties
eventHubName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The event hub to read from.
namespace
- Type: string
- Dynamic: ✔️
- Required: ✔️
Namespace name of the event hub to connect to.
bodyDeserializer
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
STRING
- Possible Values:
STRING
BINARY
ION
JSON
The Deserializer to be used for serializing the event value.
bodyDeserializerProperties
- Type: object
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be passed to the Deserializer.
Configs in key/value pairs.
checkpointStoreProperties
- Type: object
- SubType: string
- Dynamic: ❌
- Required: ❌
- Default:
{}
The config properties to be used for configuring the BlobCheckpointStore.
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
clientMaxRetries
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
5
The maximum number of retry attempts before considering a client operation to have failed.
clientRetryDelay
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
500
The maximum permissible delay between retry attempts in milliseconds.
connectionString
- Type: string
- Dynamic: ✔️
- Required: ❌
Connection string of the Storage Account.
consumerGroup
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
$Default
The consumer group.
customEndpointAddress
- Type: string
- Dynamic: ✔️
- Required: ❌
Custom endpoint address when connecting to the Event Hubs service.
enqueueTime
- Type: string
- Dynamic: ❌
- Required: ❌
The ISO Datetime to be used when PartitionStartingPosition
is configured to INSTANT
.
Configs in key/value pairs.
maxBatchSizePerPartition
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
50
The maximum number of events to consume per event hub partition per poll.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
10.000000000
- Format:
duration
The max time duration to wait to receive events from all partitions.
maxWaitTimePerPartition
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
5.000000000
- Format:
duration
The max time duration to wait to receive a batch of events up to the maxBatchSizePerPartition
.
partitionStartingPosition
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
EARLIEST
- Possible Values:
EARLIEST
LATEST
INSTANT
The starting position.
sasToken
- Type: string
- Dynamic: ✔️
- Required: ❌
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
sharedKeyAccountAccessKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key access key for authenticating requests.
sharedKeyAccountName
- Type: string
- Dynamic: ✔️
- Required: ❌
Shared Key account name for authenticating requests.
Outputs
eventsCount
- Type: integer
- Required: ❌
Number of events consumed from Azure Event Hubs.
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a kestra internal storage file containing the messages.
Was this page helpful?