Source
yaml
id: kafka-realtime-trigger
namespace: company.team
tasks:
- id: insert_into_mongodb
type: io.kestra.plugin.mongodb.InsertOne
connection:
uri: mongodb://mongoadmin:secret@localhost:27017/?authSource=admin
database: kestra
collection: products
document: |
{
"product_id": "{{ trigger.value | jq('.product_id') | first }}",
"product_name": "{{ trigger.value | jq('.product_name') | first }}",
"category": "{{ trigger.value | jq('.product_category') | first }}",
"brand": "{{ trigger.value | jq('.brand') | first }}"
}
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: products
properties:
bootstrap.servers: localhost:9092
serdeProperties:
valueDeserializer: JSON
groupId: kestraConsumer
About this blueprint
Trigger Queue Realtime Trigger
This flow will:
- Get triggered every time the event lands in Kafka
- The flow will push the data onto a collection in MongoDB using the InsertOne task
To setup Apache Kafka locally, follow the instructions mentioned in the official documentation. Once Apache Kafka is installed, you can create the
products
topic, and start producing data into the topic using the following commands:
#
# Produce data into Kafka topic $ bin/kafka-console-producer.sh --topic products --bootstrap-server localhost:9092
> {"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"} ```
To setup MongoDB server locally, you can use the following docker command: ``` docker run -d --name my-mongo \
-e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
-e MONGO_INITDB_ROOT_PASSWORD=secret \
-p 27017:27017 mongo
You can use MongoDB Compass as the UI client to work with MongoDB.
We are using the product JSON records generated from products.csv in this blueprint. A sample event that can be produced into Kafka topic products
can be:
{"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"}
More Related Blueprints