AggregateAggregate
AggregateCertified

Run aggregation pipeline on a MongoDB collection.

Run aggregation pipeline on a MongoDB collection.

yaml
type: "io.kestra.plugin.mongodb.Aggregate"

Simple aggregation pipeline to group and sum data

yaml
id: mongodb_aggregate
namespace: company.team

tasks:
  - id: aggregate
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "sales"
    pipeline:
      - $match:
          status: "active"
      - $group:
          _id: "$category"
          total:
            $sum: "$amount"
          count:
            $sum: 1
      - $sort:
          total: -1

Complex aggregation with lookup and data transformation

yaml
id: mongodb_complex_aggregate
namespace: company.team

tasks:
  - id: aggregate_with_lookup
    type: io.kestra.plugin.mongodb.Aggregate
    connection:
      uri: "mongodb://root:example@localhost:27017/?authSource=admin"
    database: "my_database"
    collection: "users"
    pipeline:
      - $lookup:
          from: "orders"
          localField: "_id"
          foreignField: "userId"
          as: "userOrders"
      - $addFields:
          totalOrders:
            $size: "$userOrders"
          totalSpent:
            $sum: "$userOrders.amount"
      - $project:
          name: 1
          email: 1
          totalOrders: 1
          totalSpent: 1
      - $match:
          totalOrders:
            $gt: 0
    allowDiskUse: true
    maxTimeMs: 30000
Properties

MongoDB collection.

MongoDB connection properties.

Definitions
uri*Requiredstring

Connection string to MongoDB server

URL format like mongodb://mongodb0.example.com: 27017

MongoDB database.

SubTypeobject

MongoDB aggregation pipeline

List of pipeline stages as a BSON array or list of maps

Defaulttrue

Whether to allow disk usage for stages

Enables writing to temporary files when a pipeline stage exceeds the 100 megabyte limit

Default1000

Batch size for cursor

Sets the number of documents to return per batch

Default60000

Maximum execution time in milliseconds

Sets the maximum execution time on the server for this operation

DefaultFETCH
Possible Values
STOREFETCHFETCH_ONENONE

Whether to store the data from the aggregation result into an Ion-serialized data file

List containing the aggregation results

Only populated if store parameter is set to false

The number of documents returned by the aggregation

Formaturi

URI of the file containing the aggregation results

Only populated if store parameter is set to true

Unitcount

Number of documents returned by the aggregation pipeline