Blueprints

Use Kafka Realtime Trigger to push events into MongoDB

Source

yaml
id: kafka-realtime-trigger
namespace: company.team

tasks:
  - id: insert_into_mongodb
    type: io.kestra.plugin.mongodb.InsertOne
    connection:
      uri: mongodb://mongoadmin:secret@localhost:27017/?authSource=admin
    database: kestra
    collection: products
    document: |
      {
        "product_id": "{{ trigger.value | jq('.product_id') | first }}",
        "product_name": "{{ trigger.value | jq('.product_name') | first }}",
        "category": "{{ trigger.value | jq('.product_category') | first }}",
        "brand": "{{ trigger.value | jq('.brand') | first }}"
      }

triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    topic: products
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      valueDeserializer: JSON
    groupId: kestraConsumer

About this blueprint

Trigger Queue Realtime Trigger

This flow will:

  1. Get triggered every time the event lands in Kafka
  2. The flow will push the data onto a collection in MongoDB using the InsertOne task To setup Apache Kafka locally, follow the instructions mentioned in the official documentation. Once Apache Kafka is installed, you can create the products topic, and start producing data into the topic using the following commands:
#
# Produce data into Kafka topic $ bin/kafka-console-producer.sh --topic products --bootstrap-server localhost:9092
> {"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"} ```
To setup MongoDB server locally, you can use the following docker command: ``` docker run -d --name my-mongo \                                          
    -e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
    -e MONGO_INITDB_ROOT_PASSWORD=secret \
    -p 27017:27017 mongo

You can use MongoDB Compass as the UI client to work with MongoDB. We are using the product JSON records generated from products.csv in this blueprint. A sample event that can be produced into Kafka topic products can be: {"product_id": 1, "product_name": "streamline turn-key systems", "product_category": "Electronics", "brand": "gomez"}

Insert One

Realtime Trigger

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra