Source
yaml
id: eventhubs-realtime-trigger
namespace: company.team
tasks:
- id: insert_into_storagetable
type: io.kestra.plugin.azure.storage.table.Bulk
endpoint: https://yourstorageaccount.blob.core.windows.net
connectionString: "{{ secret('STORAGETABLE_CONNECTION') }}"
table: orders
from:
- partitionKey: order_id
rowKey: "{{ trigger.body | jq('.order_id') | first }}"
properties:
customer_name: "{{ trigger.body | jq('.customer_name') | first }}"
customer_email: "{{ trigger.body | jq('.customer_email') | first }}"
product_id: "{{ trigger.body | jq('.product_id') | first }}"
price: "{{ trigger.body | jq('.price') | first }}"
quantity: "{{ trigger.body | jq('.quantity') | first }}"
total: "{{ trigger.body | jq('.total') | first }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
eventHubName: orders
namespace: kestra
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: $Default
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
About this blueprint
Trigger Queue Realtime Trigger Azure
This flow will:
- Get triggered every time the event lands in the Azure Eventhub
- The flow will push the data into StorageTable table
For this, create an Azure Eventhub named
orders
underkestra
namespace. We will be producing JSON messages into the Eventhub generated from the orders.csv. One sample produced message can be:{"order_id": "1", "customer_name": "Kelly Olsen", "customer_email": "[email protected]", "product_id": "20", "price": "166.89", "quantity": "1", "total": "166.89"}
Createorders
table in the StorageTable. We get the connectionString for Eventhubs, Blob and StorageTable from the secretsEVENTHUBS_CONNECTION
,BLOB_CONNECTION
andSTORAGETABLE_CONNECTION
respectively. When you produce the message onto Eventhub, the flow will get triggered, and you can see that a corresponding new record gets into the StorageTable table.