Blueprints

Use Azure Eventhubs Realtime Trigger to push events into StorageTable

About this blueprint

Azure Queue Realtime

This flow will:

  1. Get triggered every time the event lands in the Azure Eventhub
  2. The flow will push the data into StorageTable table

For this, create an Azure Eventhub named orders under kestra namespace. We will be producing JSON messages into the Eventhub generated from the orders.csv. One sample produced message can be:

{"order_id": "1", "customer_name": "Kelly Olsen", "customer_email": "[email protected]", "product_id": "20", "price": "166.89", "quantity": "1", "total": "166.89"}

Create orders table in the StorageTable.

We get the connectionString for Eventhubs, Blob and StorageTable from the secrets EVENTHUBS_CONNECTION, BLOB_CONNECTION and STORAGETABLE_CONNECTION respectively.

When you produce the message onto Eventhub, the flow will get triggered, and you can see that a corresponding new record gets into the StorageTable table.

yaml
id: eventhubs_realtime_trigger
namespace: company.team

tasks:
  - id: insert_into_storageTable
    type: io.kestra.plugin.azure.storage.table.Bulk
    endpoint: "https://yourstorageaccount.blob.core.windows.net"
    connectionString: "{{ secret('STORAGETABLE_CONNECTION') }}"
    table: "orders"
    from:
      - partitionKey: "order_id"
        rowKey: "{{ trigger.body | jq('.order_id') | first }}"
        properties:
          "customer_name": "{{ trigger.body | jq('.customer_name') | first }}"
          "customer_email": "{{ trigger.body | jq('.customer_email') | first }}"
          "product_id": "{{ trigger.body | jq('.product_id') | first }}"
          "price": "{{ trigger.body | jq('.price') | first }}"
          "quantity": "{{ trigger.body | jq('.quantity') | first }}"
          "total": "{{ trigger.body | jq('.total') | first }}"
triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
    eventHubName: orders
    namespace: kestra
    connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
    bodyDeserializer: JSON
    consumerGroup: "$Default"
    checkpointStoreProperties:
      containerName: kestra
      connectionString: "{{ secret('BLOB_CONNECTION') }}"

Bulk

Realtime Trigger

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra