聚合管道语句
前言
一、db.aggregate()
1.1 认识
1.2 语法
1.3 用法
二、db.collection.aggregate()
2.1 认识
db.collection.aggregate()
提供对聚合管道的访问权限, 并计算集合或视图中数据的聚合值。返回在聚合分析管道的最后阶段生成的文档游标。如果管道包含 explain
选项,查询将返回一个详细说明聚合操作处理的文档。如果管道包含 $out
或 $merge
操作符,查询将返回一个空游标。
2.2 语法
db.collection.aggregate( <pipeline>, <options> )
-
pipeline
: 一系列数据聚合操作或阶段。该方法仍然可以接受管道阶段作为单独的参数,而不是作为数组中的元素; 但是,如果您未将pipeline
指定为数组,则无法指定options
参数。-
$project
: 对输入文档再次投影。可以用来灵活的控制输出文档的格式, 也可以用来剔除不相关的字段,以优化聚合管道操作的性能。 -
$match
: 对输入文档进行筛选。应该尽量在聚合管道的开始阶段应用$match
,这样可以减少后续阶段中需要处理的文档数量, 以优化聚合操作的性能。 -
$limit
: 筛选出管道内前N
篇文档。 -
$skip
: 跳过管道内前N
篇文档 -
$unwind
: 展开输入文档中的数组字段。 -
$sort
: 对输入文档进行排序, 如果对多个字段进行排序,则按从左到右的顺序进行排序。例如,在上面的表单中,文档首先按<field1>
排序。然后,具有相同<field1>
值的文档将按<field2>
进一步排序。 -
$lookup
: 对输入文档进行查询操作 -
$group
: 对输入文档进行分组 -
$out
: 对管道中的文档输出
-
-
options
: 可选。aggregate()
aggregate
传递给 命令的其他选项。仅当您将pipeline
指定为大量时才可用。allowDiskUse
:<boolean>
, 每个聚合管道阶段使用的内存不能超过100MB
, 如何数据量较大, 为了防止聚合管道阶段超出内存上限并且抛出错误, 可以启用allowDiskUse
选项。allowDiskUse
启用之后, 聚合阶段可以在内存容量不足时, 将操作数据写入临时文件中。临时文件会被写入dbPath
下的_tmp
文件夹,dbPath
的默认值为/data/db
2.3 用法
$project
对输入文档再次投影
// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])
// $project 重新投影
db.user1.aggregate([ { $project: { _id: 0, balance: 1, clientName: "$name.firstName", nameArray: ["$name.firstName", "$name.lastName"] } } ]);
$match
对输入文档进行筛选
// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])
// $match 对文档进行筛选
db.user1.aggregate([ { $match: { "name.firstName": { $eq: "alice" }} } ]);
db.user1.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": { $regex: /^y/ }}] } }]);
// 将 $match 管道的筛选结果传递给 $project 管道
db.user1.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": { $regex: /^y/ }}] } }, { $project: { _id: 0}}]);
$limit
筛选出管道内前 N
篇文档
// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])
// $limit 筛选出管道内前 N 篇文档
db.user1.aggregate([ { $limit: 1} ]);
$skip
跳过管道内前 N
篇文档:
// 新增几个文档
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50}, { name: { firstName: "bob", lastName: "yang" }, balance: 20}])
// $skip 跳过管道内前 N 篇文档
db.user1.aggregate([ { $skip: 1} ]);
$unwind
展开输入文档中的数组字段
db.user1.update({ "name.firstName": "alice"}, { $set: { currency: ["CNY", "USD"]}});
db.user1.update({ "name.firstName": "bob"}, { $set: { currency: ["GBP"]}});
db.user1.insertMany([ { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
// $unwind 展开输入文档中的数组字段, 将数组字段中的每个元素新建文档,新文档跟原文档一模一样, 唯一的区别是数组字段值变为每一个元素。
db.user1.aggregate([ { $unwind: { path: "$currency", includeArrayIndex: 'ccyIndex', preserveNullAndEmptyArrays: false } } ]);
// 结果如下:
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'CNY',
ccyIndex: Long('0')
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'USD',
ccyIndex: Long('1')
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: 'GBP',
ccyIndex: Long('0')
}
]
$sort
对输入文档进行排序
db.user1.aggregate([ { $sort: { balance: 1, "name.lastName": -1 } } ]);
单个条件联接的等值匹配: localField
字段值会与 foreignField
字段值进行等值匹配
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);
//
db.user1.aggregate([ { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } } ]);
// 输出结果为:
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd3'),
ccy: 'GBP',
rate: 8.72,
date: ISODate('2018-08-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
forexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
forexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
forexData: []
}
]
//
db.user1.aggregate([ { $unwind: { path: "$currency" }}, { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } }]);
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'CNY',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: 'USD',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: 'GBP',
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd3'),
ccy: 'GBP',
rate: 8.72,
date: ISODate('2018-08-21T00:00:00.000Z')
}
]
}
]
非关联管道子查询联接
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);
// $lookup 子查询不相关联接
db.user1.aggregate([ { $lookup: { from: "forex", pipeline: [ { $match: { date: new Date("2018-12-21") } } ] , as: "forexData"} } ]);
// 结果如下:
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
forexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
}
]
关联管道子查询联接
db.user1.insertMany([ { name: { firstName: "alice", lastName: "wong" }, balance: 50, currency: ["CNY", "USD"]}, { name: { firstName: "bob", lastName: "yang" }, balance: 20, currency: ["GBP"]}, { name: { firstName: "charlie", lastName: "gordon" }, balance: 100},{ name: { firstName: "david", lastName: "wu" }, balance: 200, currency: []}, { name: { firstName: "eddie", lastName: "kim" }, balance: 20, currency: null} ]);
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2018-12-21" ) }, { ccy: "GBP", rate: 8.72, date: new Date("2018-08-21" ) }, { ccy: "CNY", rate: 1.0, date: new Date("2018-12-21" ) } ]);
// $lookup 子查询相关联接
db.user1.aggregate([ { $lookup: { from: "forex", let: { bal: "$balance" }, pipeline: [ { $match: { $expr: { $and: [ { $eq: [ "$date", new Date("2018-12-21" )] }, { $gt: [ "$$bal", 100 ]} ] } } } ], as: "froexData" } } ]);
// 输出结果
[
{
_id: ObjectId('67bf93c73d5b2b20ce11cbca'),
name: { firstName: 'alice', lastName: 'wong' },
balance: 50,
currency: [ 'CNY', 'USD' ],
froexData: []
},
{
_id: ObjectId('67bf93c73d5b2b20ce11cbcb'),
name: { firstName: 'bob', lastName: 'yang' },
balance: 20,
currency: [ 'GBP' ],
froexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbcf'),
name: { firstName: 'charlie', lastName: 'gordon' },
balance: 100,
froexData: []
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd0'),
name: { firstName: 'david', lastName: 'wu' },
balance: 200,
currency: [],
froexData: [
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd2'),
ccy: 'USD',
rate: 6.91,
date: ISODate('2018-12-21T00:00:00.000Z')
},
{
_id: ObjectId('67bfa3853d5b2b20ce11cbd4'),
ccy: 'CNY',
rate: 1,
date: ISODate('2018-12-21T00:00:00.000Z')
}
]
},
{
_id: ObjectId('67bf9fdd3d5b2b20ce11cbd1'),
name: { firstName: 'eddie', lastName: 'kim' },
balance: 20,
currency: null,
froexData: []
}
]
$group
对输入文档进行分组
db.transactions.insertMany([ { symbol: "600519", qty: 100, price: 567.4, currency: "CNY" }, { symbol: "AMZN", qty: 1, price: 1377.5, currency: "USD" }, { symbol: "AAPL", qty: 2, price: 150.7, currency: "USD"} ]);
// 根据 currency 进行分组, currency 相同的为一组。因此, 不使用聚合操作符的情况下, $group 可以返回管道文档中某一字段的所有值(不重复)
db.transactions.aggregate([ { $group: { _id: "$currency" } } ]);
// 输出结果:
[ { _id: 'USD' }, { _id: 'CNY' } ]
// 根据 currency 进行分组, currency 相同的为一组。增加 totalQty 字段, 表示 qty 的总和; 增加 totalNotional 字段, 表示 价格 * 数量 的总和; 增加 avgPrice 字段, 表示 price 的平均值; 增加 count 字段, 表示这个分组有多少篇文档; 增加 maxNotional 字段, 求出该分组中 价格 * 数量 中的最大值; minNotional 同理。
db.transactions.aggregate([ { $group: { _id: "$currency", totalQty: { $sum: "$qty" }, totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } }, avgPrice: { $avg: "$price" }, count: { $sum: 1 }, maxNotional: { $max: { $multiply: [ "$price", "$qty" ] } }, minNotional: { $min: {$multiply: [ "$price", "$qty" ] } } } } ]);
// 输出结果:
[
{
_id: 'USD',
totalQty: 3,
totalNotional: 1678.9,
avgPrice: 764.1,
count: 2,
maxNotional: 1377.5,
minNotional: 301.4
},
{
_id: 'CNY',
totalQty: 100,
totalNotional: 56740,
avgPrice: 567.4,
count: 1,
maxNotional: 56740,
minNotional: 56740
}
]
// 将整个集合分为一组, 再此基础上进行聚合操作
db.transactions.aggregate([ { $group: { _id: null, totalQty: { $sum: "$qty" }, totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } }, avgPrice: { $avg: "$price" }, count: { $sum: 1 }, maxNotional: { $max: { $multiply: [ "$price", "$qty" ] } }, minNotional: { $min: {$multiply: [ "$price", "$qty" ] } } } } ]);
// 输出结果为:
[
{
_id: null,
totalQty: 103,
totalNotional: 58418.9,
avgPrice: 698.5333333333333,
count: 3,
maxNotional: 56740,
minNotional: 301.4
}
]
// 基于 currency 分组, 并且将该组中的 $symbol 字段全部放到 symbols 数组中
db.transactions.aggregate([ { $group: { _id: "$currency", symbols: { $push: "$symbol" } } } ]);
$out
将聚合管道中的文档写入一个新的集合
db.transactions.insertMany([ { symbol: "600519", qty: 100, price: 567.4, currency: "CNY" }, { symbol: "AMZN", qty: 1, price: 1377.5, currency: "USD" }, { symbol: "AAPL", qty: 2, price: 150.7, currency: "USD"} ]);
// 第一次, output 集合不存在
db.transactions.aggregate([ { $group: { _id: "$currency", symbols: { $push: "$symbol" } } }, { $out: "output" } ]);
// 第二次, output 集合存在, 直接覆盖之前的集合
db.transactions.aggregate([ { $group:{ _id: "$symbol", totalNotional: { $sum: { $multiply: [ "$price", "$qty" ] } } } },{ $out: "output" } ]);