Blueprints

Use Kafka Realtime Trigger to push events into MongoDB

About this blueprint

Queue Realtime

This flow will:

  1. Get triggered every time the event lands in Kafka
  2. The flow will push the data onto a collection in MongoDB using the InsertOne task

To setup Apache Kafka locally, follow the instructions mentioned in the official documentation. Once Apache Kafka is installed, you can create the products topic, and start producing data into the topic using the following commands:

# Create topic
$ bin/kafka-topics.sh --create --topic products --bootstrap-server localhost:9092

# Produce data into Kafka topic
$ bin/kafka-console-producer.sh --topic products --bootstrap-server localhost:9092
> {"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"}

To setup MongoDB server locally, you can use the following docker command:

docker run -d --name my-mongo \                                          
-e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
-e MONGO_INITDB_ROOT_PASSWORD=secret \
-p 27017:27017 mongo

You can use MongoDB Compass as the UI client to work with MongoDB.

We are using the product JSON records generated from products.csv in this blueprint. A sample event that can be produced into Kafka topic products can be:

{"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"}
yaml
id: kafka_realtime_trigger
namespace: company.team

tasks:
  - id: insert_into_mongodb
    type: io.kestra.plugin.mongodb.InsertOne
    connection:
      uri: "mongodb://mongoadmin:secret@localhost:27017/?authSource=admin"
    database: "kestra"
    collection: "products"
    document: |
      {
        "product_id": "{{ trigger.value | jq('.product_id') | first }}",
        "product_name": "{{ trigger.value | jq('.product_name') | first }}",
        "category": "{{ trigger.value | jq('.product_category') | first }}",
        "brand": "{{ trigger.value | jq('.brand') | first }}"
      }
triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    topic: products
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      valueDeserializer: JSON
    groupId: kestraConsumer

Insert One

Realtime Trigger

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra