Produce Produce

yaml
type: "io.kestra.plugin.azure.eventhubs.Produce"

Publish events to Azure Event Hubs.

Examples

Publish a file as events into Azure EventHubs.

yaml
id: azure_eventhubs_send_events
namespace: company.team

inputs:
  - id: file
    type: FILE
    description: a CSV file with columns id, username, tweet, and timestamp

tasks:
  - id: read_csv_file
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ inputs.file }}"

  - id: transform_row_to_json
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.read_csv_file.uri }}"
    script: |
      var result = {
        "body": {
          "username": row.username,
          "tweet": row.tweet
        }
      };
      row = result

  - id: send_to_eventhub
    type: io.kestra.plugin.azure.eventhubs.Produce
    from: "{{ outputs.transform_row_to_json.uri }}"
    eventHubName: my_eventhub
    namespace: my_event_hub_namespace
    connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
    maxBatchSizeInBytes: 4096
    maxEventsPerBatch: 100
    bodySerializer: "JSON"
    bodyContentType: application/json
    eventProperties:
      source: kestra

Properties

eventHubName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The event hub to read from.

from

  • Type:
    • string
    • array
    • object
  • Dynamic: ✔️
  • Required: ✔️

The content of the message to be sent to EventHub

Can be an internal storage URI, a map (i.e. a list of key-value pairs) or a list of maps. The following keys are supported: from, contentType, properties.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Namespace name of the event hub to connect to.

bodyContentType

  • Type: string
  • Dynamic:
  • Required:

The MIME type describing the event data

The MIME type describing the data contained in event body allowing consumers to make informed decisions for inspecting and processing the event.

bodySerializer

  • Type: string
  • Dynamic:
  • Required:
  • Default: STRING
  • Possible Values:
    • STRING
    • BINARY
    • ION
    • JSON

The Serializer to be used for serializing the event value.

bodySerializerProperties

  • Type: object
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be passed to the Serializer.

Configs in key/value pairs.

clientMaxRetries

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 5

The maximum number of retry attempts before considering a client operation to have failed.

clientRetryDelay

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 500

The maximum permissible delay between retry attempts in milliseconds.

connectionString

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection string of the Storage Account.

customEndpointAddress

  • Type: string
  • Dynamic: ✔️
  • Required:

Custom endpoint address when connecting to the Event Hubs service.

eventProperties

  • Type: object
  • SubType: string
  • Dynamic:
  • Required:
  • Default: {}

The event properties

The event properties which may be used for passing metadata associated with the event body during Event Hubs operations.

maxBatchSizeInBytes

  • Type: integer
  • Dynamic:
  • Required:

The maximum size for batches of events, in bytes.

maxEventsPerBatch

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 1000

The maximum number of events per batches.

partitionKey

  • Type: string
  • Dynamic:
  • Required:

The hashing key to be provided for the batches of events.

Events with the same partitionKey are hashed and sent to the same partition. The provided partitionKey will be used for all the events sent by the Produce task.

sasToken

  • Type: string
  • Dynamic: ✔️
  • Required:

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key access key for authenticating requests.

sharedKeyAccountName

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key account name for authenticating requests.

Outputs

eventsCount

  • Type: integer
  • Required:

Total number of events processed by the task.

sendBatchesCount

  • Type: integer
  • Required:

Total number of batches sent to an Azure EventHubs.

Was this page helpful?