跳到主要内容

聚合管道语句

2025年02月26日
柏拉文
越努力,越幸运

前言


聚合管道命令文档

一、db.aggregate()


1.1 认识

1.2 语法

1.3 用法

二、db.collection.aggregate()


2.1 认识

db.collection.aggregate() 提供对聚合管道的访问权限, 并计算集合或视图中数据的聚合值。返回在聚合分析管道的最后阶段生成的文档游标。如果管道包含 explain 选项,查询将返回一个详细说明聚合操作处理的文档。如果管道包含 $out$merge 操作符,查询将返回一个空游标。

2.2 语法

db.collection.aggregate( <pipeline>, <options> )
  • pipeline: 一系列数据聚合操作或阶段。该方法仍然可以接受管道阶段作为单独的参数,而不是作为数组中的元素; 但是,如果您未将 pipeline 指定为数组,则无法指定 options 参数。

    • $project: 对输入文档再次投影。可以用来灵活的控制输出文档的格式, 也可以用来剔除不相关的字段,以优化聚合管道操作的性能。

    • $match: 对输入文档进行筛选。应该尽量在聚合管道的开始阶段应用 $match,这样可以减少后续阶段中需要处理的文档数量, 以优化聚合操作的性能。

    • $limit: 筛选出管道内前 N 篇文档。

    • $skip: 跳过管道内前 N 篇文档

    • $unwind: 展开输入文档中的数组字段。

    • $sort: 对输入文档进行排序, 如果对多个字段进行排序,则按从左到右的顺序进行排序。例如,在上面的表单中,文档首先按 <field1> 排序。然后,具有相同 <field1> 值的文档将按 <field2> 进一步排序。

    • $lookup: 对输入文档进行查询操作

    • $group: 对输入文档进行分组

    • $out: 对管道中的文档输出

  • options: 可选。 aggregate() aggregate 传递给 命令的其他选项。仅当您将 pipeline 指定为大量时才可用。

    • allowDiskUse: <boolean>, 每个聚合管道阶段使用的内存不能超过 100MB, 如何数据量较大, 为了防止聚合管道阶段超出内存上限并且抛出错误, 可以启用 allowDiskUse 选项。allowDiskUse 启用之后, 聚合阶段可以在内存容量不足时, 将操作数据写入临时文件中。临时文件会被写入 dbPath 下的 _tmp 文件夹, dbPath 的默认值为 /data/db

2.3 用法

$project 对输入文档再次投影

// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])

// $project 重新投影
db.user1.aggregate([ { $project: { _id: 0, balance: 1, clientName: "$name.firstName", nameArray: ["$name.firstName", "$name.lastName"] } } ]);

$match 对输入文档进行筛选

// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])

// $match 对文档进行筛选
db.user1.aggregate([ { $match: { "name.firstName": { $eq: "alice" }} } ]);
db.user1.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": { $regex: /^y/ }}] } }]);

// 将 $match 管道的筛选结果传递给 $project 管道
db.user1.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": { $regex: /^y/ }}] } }, { $project: { _id: 0}}]);

$limit 筛选出管道内前 N 篇文档

// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])

// $limit 筛选出管道内前 N 篇文档
db.user1.aggregate([ { $limit: 1} ]);

$skip 跳过管道内前 N 篇文档:

// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])

// $skip 跳过管道内前 N 篇文档
db.user1.aggregate([ { $skip: 1} ]);

$unwind 展开输入文档中的数组字段

db.user1.update({ "name.firstName": "alice"}, { $set: { currency: ["CNY", "USD"]}});
db.user1.update({ "name.firstName": "bob"}, { $set: { currency: ["GBP"]}});
db.user1.insertMany([ { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);

// $unwind 展开输入文档中的数组字段, 将数组字段中的每个元素新建文档,新文档跟原文档一模一样, 唯一的区别是数组字段值变为每一个元素。
db.user1.aggregate([ { $unwind: { path: "$currency", includeArrayIndex: 'ccyIndex', preserveNullAndEmptyArrays: false } } ]);

// 结果如下:

[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'CNY',
ccyIndex: Long('0')
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'USD',
ccyIndex: Long('1')
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: 'GBP',
ccyIndex: Long('0')
}
]

$sort 对输入文档进行排序

db.user1.aggregate([ { $sort: { balance: 1, "name.lastName": -1 } } ]);

单个条件联接的等值匹配: localField 字段值会与 foreignField 字段值进行等值匹配

db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);

db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);

//
db.user1.aggregate([ { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } } ]);

// 输出结果为:
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd3'),
ccy: 'GBP',
rate: 8.72,
date: ISODate('2018-08-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
forexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
forexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
forexData: []
}
]

//
db.user1.aggregate([ { $unwind: { path: "$currency" }}, { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } }]);

[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'CNY',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'USD',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: 'GBP',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd3'),
ccy: 'GBP',
rate: 8.72,
date: ISODate('2018-08-21T00:00:00.000Z')
}
]
}
]

非关联管道子查询联接

db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);

// $lookup 子查询不相关联接
db.user1.aggregate([ { $lookup: { from: "forex", pipeline: [ { $match: { date: new Date("2018-12-21") } } ] , as: "forexData"} } ]);

// 结果如下:

[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
}
]

关联管道子查询联接

db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);

// $lookup 子查询相关联接
db.user1.aggregate([ { $lookup: { from: "forex", let: { bal: "$balance" }, pipeline: [ { $match: { $expr: { $and: [ { $eq: [ "$date", new Date("2018-12-21" )] }, { $gt: [ "$$bal", 100 ]} ] } } } ], as: "froexData" } } ]);

// 输出结果

[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
froexData: []
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
froexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
froexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
froexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
froexData: []
}
]

$group 对输入文档进行分组

db.transactions.insertMany([ { symbol: "600519", qty: 100, price: 567.4, currency: "CNY" }, { symbol: "AMZN", qty: 1, price: 1377.5, currency: "USD" }, { symbol: "AAPL", qty: 2, price: 150.7, currency: "USD"} ]);

// 根据 currency 进行分组, currency 相同的为一组。因此, 不使用聚合操作符的情况下, $group 可以返回管道文档中某一字段的所有值(不重复)
db.transactions.aggregate([ { $group: { _id: "$currency" } } ]);

// 输出结果:
[ { _id: 'USD' }, { _id: 'CNY' } ]

// 根据 currency 进行分组, currency 相同的为一组。增加 totalQty 字段, 表示 qty 的总和; 增加 totalNotional 字段, 表示 价格 * 数量 的总和; 增加 avgPrice 字段, 表示 price 的平均值; 增加 count 字段, 表示这个分组有多少篇文档; 增加 maxNotional 字段, 求出该分组中 价格 * 数量 中的最大值; minNotional 同理。
db.transactions.aggregate([ { $group: { _id: "$currency", totalQty: { $sum: "$qty" }, totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } }, avgPrice: { $avg: "$price" }, count: { $sum: 1 }, maxNotional: { $max: { $multiply: [ "$price", "$qty" ] } }, minNotional: { $min: {$multiply: [ "$price", "$qty" ] } } } } ]);
// 输出结果:
[
{
_id: 'USD',
totalQty: 3,
totalNotional: 1678.9,
avgPrice: 764.1,
count: 2,
maxNotional: 1377.5,
minNotional: 301.4
},
{
_id: 'CNY',
totalQty: 100,
totalNotional: 56740,
avgPrice: 567.4,
count: 1,
maxNotional: 56740,
minNotional: 56740
}
]

// 将整个集合分为一组, 再此基础上进行聚合操作
db.transactions.aggregate([ { $group: { _id: null, totalQty: { $sum: "$qty" }, totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } }, avgPrice: { $avg: "$price" }, count: { $sum: 1 }, maxNotional: { $max: { $multiply: [ "$price", "$qty" ] } }, minNotional: { $min: {$multiply: [ "$price", "$qty" ] } } } } ]);
// 输出结果为:

[
{
_id: null,
totalQty: 103,
totalNotional: 58418.9,
avgPrice: 698.5333333333333,
count: 3,
maxNotional: 56740,
minNotional: 301.4
}
]

// 基于 currency 分组, 并且将该组中的 $symbol 字段全部放到 symbols 数组中
db.transactions.aggregate([ { $group: { _id: "$currency", symbols: { $push: "$symbol" } } } ]);

$out 将聚合管道中的文档写入一个新的集合

db.transactions.insertMany([ { symbol: "600519", qty: 100, price: 567.4, currency: "CNY" }, { symbol: "AMZN", qty: 1, price: 1377.5, currency: "USD" }, { symbol: "AAPL", qty: 2, price: 150.7, currency: "USD"} ]);

// 第一次, output 集合不存在
db.transactions.aggregate([ { $group: { _id: "$currency", symbols: { $push: "$symbol" } } }, { $out: "output" } ]);

// 第二次, output 集合存在, 直接覆盖之前的集合
db.transactions.aggregate([ { $group:{ _id: "$symbol", totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } } } },{ $out: "output" } ]);

2.4 游标