Source
yaml
id: sqs-realtime-trigger
namespace: company.team
tasks:
- id: insert_into_dynamoDB
type: io.kestra.plugin.aws.dynamodb.PutItem
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: eu-central-1
tableName: orders
item:
order_id: "{{ trigger.data | jq('.order_id') | first }}"
customer_name: "{{ trigger.data | jq('.customer_name') | first }}"
customer_email: "{{ trigger.data | jq('.customer_email') | first }}"
product_id: "{{ trigger.data | jq('.product_id') | first }}"
price: "{{ trigger.data | jq('.price') | first }}"
quantity: "{{ trigger.data | jq('.quantity') | first }}"
total: "{{ trigger.data | jq('.total') | first }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.aws.sqs.RealtimeTrigger
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: eu-central-1
queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/orders
serdeType: JSON
About this blueprint
AWS Queue Realtime Trigger
This flow will:
- Get triggered every time the event lands in the AWS SQS queue
- The flow will push the data into a table in DynamoDB
For this, create a SQS queue named
orders
. We will be producing JSON messages into the queue generated from the orders.csv. One sample produced message can be:{"order_id": "1", "customer_name": "Kelly Olsen", "customer_email": "[email protected]", "product_id": "20", "price": "166.89", "quantity": "1", "total": "166.89"}
Createorders
table in the DynamoDB. We get the AWS access key and secret key from the secretsAWS_ACCESS_KEY_ID
andAWS_SECRET_KEY_ID
. When you produce the message onto SQS queue, the flow will get triggered, and you can see that a corresponding new records gets into the DynamoDB table.