You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3144 lines
126 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

'use strict';
var checkCollectionName = require('./utils').checkCollectionName,
ObjectID = require('mongodb-core').BSON.ObjectID,
Long = require('mongodb-core').BSON.Long,
Code = require('mongodb-core').BSON.Code,
f = require('util').format,
AggregationCursor = require('./aggregation_cursor'),
MongoError = require('mongodb-core').MongoError,
shallowClone = require('./utils').shallowClone,
isObject = require('./utils').isObject,
toError = require('./utils').toError,
normalizeHintField = require('./utils').normalizeHintField,
handleCallback = require('./utils').handleCallback,
decorateCommand = require('./utils').decorateCommand,
formattedOrderClause = require('./utils').formattedOrderClause,
ReadPreference = require('mongodb-core').ReadPreference,
CommandCursor = require('./command_cursor'),
Define = require('./metadata'),
Cursor = require('./cursor'),
unordered = require('./bulk/unordered'),
ordered = require('./bulk/ordered'),
ChangeStream = require('./change_stream'),
executeOperation = require('./utils').executeOperation;
/**
* @fileOverview The **Collection** class is an internal class that embodies a MongoDB collection
* allowing for insert/update/remove/find and other command operation on that MongoDB collection.
*
* **COLLECTION Cannot directly be instantiated**
* @example
* const MongoClient = require('mongodb').MongoClient;
* const test = require('assert');
* // Connection url
* const url = 'mongodb://localhost:27017';
* // Database Name
* const dbName = 'test';
* // Connect using MongoClient
* MongoClient.connect(url, function(err, client) {
* // Create a collection we want to drop later
* const col = client.db(dbName).collection('createIndexExample1');
* // Show that duplicate records got dropped
* col.find({}).toArray(function(err, items) {
* test.equal(null, err);
* test.equal(4, items.length);
* client.close();
* });
* });
*/
var mergeKeys = ['readPreference', 'ignoreUndefined'];
/**
* Create a new Collection instance (INTERNAL TYPE, do not instantiate directly)
* @class
* @property {string} collectionName Get the collection name.
* @property {string} namespace Get the full collection namespace.
* @property {object} writeConcern The current write concern values.
* @property {object} readConcern The current read concern values.
* @property {object} hint Get current index hint for collection.
* @return {Collection} a Collection instance.
*/
var Collection = function(db, topology, dbName, name, pkFactory, options) {
checkCollectionName(name);
// Unpack variables
var internalHint = null;
var slaveOk = options == null || options.slaveOk == null ? db.slaveOk : options.slaveOk;
var serializeFunctions =
options == null || options.serializeFunctions == null
? db.s.options.serializeFunctions
: options.serializeFunctions;
var raw = options == null || options.raw == null ? db.s.options.raw : options.raw;
var promoteLongs =
options == null || options.promoteLongs == null
? db.s.options.promoteLongs
: options.promoteLongs;
var promoteValues =
options == null || options.promoteValues == null
? db.s.options.promoteValues
: options.promoteValues;
var promoteBuffers =
options == null || options.promoteBuffers == null
? db.s.options.promoteBuffers
: options.promoteBuffers;
var readPreference = null;
var collectionHint = null;
var namespace = f('%s.%s', dbName, name);
// Get the promiseLibrary
var promiseLibrary = options.promiseLibrary || Promise;
// Assign the right collection level readPreference
if (options && options.readPreference) {
readPreference = options.readPreference;
} else if (db.options.readPreference) {
readPreference = db.options.readPreference;
}
// Set custom primary key factory if provided
pkFactory = pkFactory == null ? ObjectID : pkFactory;
// Internal state
this.s = {
// Set custom primary key factory if provided
pkFactory: pkFactory,
// Db
db: db,
// Topology
topology: topology,
// dbName
dbName: dbName,
// Options
options: options,
// Namespace
namespace: namespace,
// Read preference
readPreference: readPreference,
// SlaveOK
slaveOk: slaveOk,
// Serialize functions
serializeFunctions: serializeFunctions,
// Raw
raw: raw,
// promoteLongs
promoteLongs: promoteLongs,
// promoteValues
promoteValues: promoteValues,
// promoteBuffers
promoteBuffers: promoteBuffers,
// internalHint
internalHint: internalHint,
// collectionHint
collectionHint: collectionHint,
// Name
name: name,
// Promise library
promiseLibrary: promiseLibrary,
// Read Concern
readConcern: options.readConcern
};
};
var define = (Collection.define = new Define('Collection', Collection, false));
Object.defineProperty(Collection.prototype, 'dbName', {
enumerable: true,
get: function() {
return this.s.dbName;
}
});
Object.defineProperty(Collection.prototype, 'collectionName', {
enumerable: true,
get: function() {
return this.s.name;
}
});
Object.defineProperty(Collection.prototype, 'namespace', {
enumerable: true,
get: function() {
return this.s.namespace;
}
});
Object.defineProperty(Collection.prototype, 'readConcern', {
enumerable: true,
get: function() {
return this.s.readConcern || { level: 'local' };
}
});
Object.defineProperty(Collection.prototype, 'writeConcern', {
enumerable: true,
get: function() {
var ops = {};
if (this.s.options.w != null) ops.w = this.s.options.w;
if (this.s.options.j != null) ops.j = this.s.options.j;
if (this.s.options.fsync != null) ops.fsync = this.s.options.fsync;
if (this.s.options.wtimeout != null) ops.wtimeout = this.s.options.wtimeout;
return ops;
}
});
/**
* @ignore
*/
Object.defineProperty(Collection.prototype, 'hint', {
enumerable: true,
get: function() {
return this.s.collectionHint;
},
set: function(v) {
this.s.collectionHint = normalizeHintField(v);
}
});
/**
* Creates a cursor for a query that can be used to iterate over results from MongoDB
* @method
* @param {object} [query={}] The cursor query object.
* @param {object} [options=null] Optional settings.
* @param {number} [options.limit=0] Sets the limit of documents returned in the query.
* @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
* @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
* @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
* @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
* @param {boolean} [options.explain=false] Explain the query instead of returning the data.
* @param {boolean} [options.snapshot=false] Snapshot query.
* @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
* @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
* @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
* @param {boolean} [options.returnKey=false] Only return the index key.
* @param {number} [options.maxScan=null] Limit the number of items to scan.
* @param {number} [options.min=null] Set index bounds.
* @param {number} [options.max=null] Set index bounds.
* @param {boolean} [options.showDiskLoc=false] Show disk location of results.
* @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {ClientSession} [options.session] optional session to use for this operation
* @throws {MongoError}
* @return {Cursor}
*/
Collection.prototype.find = function(query, options, callback) {
let selector = query;
// figuring out arguments
if (typeof callback !== 'function') {
if (typeof options === 'function') {
callback = options;
options = undefined;
} else if (options == null) {
callback = typeof selector === 'function' ? selector : undefined;
selector = typeof selector === 'object' ? selector : undefined;
}
}
// Ensure selector is not null
selector = selector == null ? {} : selector;
// Validate correctness off the selector
var object = selector;
if (Buffer.isBuffer(object)) {
var object_size = object[0] | (object[1] << 8) | (object[2] << 16) | (object[3] << 24);
if (object_size !== object.length) {
var error = new Error(
'query selector raw message size does not match message header size [' +
object.length +
'] != [' +
object_size +
']'
);
error.name = 'MongoError';
throw error;
}
}
// Check special case where we are using an objectId
if (selector != null && selector._bsontype === 'ObjectID') {
selector = { _id: selector };
}
if (!options) options = {};
let projection = options.projection || options.fields;
if (projection && !Buffer.isBuffer(projection) && Array.isArray(projection)) {
projection = projection.length
? projection.reduce((result, field) => {
result[field] = 1;
return result;
}, {})
: { _id: 1 };
}
var newOptions = {};
// Make a shallow copy of the collection options
for (var key in this.s.options) {
if (mergeKeys.indexOf(key) !== -1) {
newOptions[key] = this.s.options[key];
}
}
// Make a shallow copy of options
for (var optKey in options) {
newOptions[optKey] = options[optKey];
}
// Unpack options
newOptions.skip = options.skip ? options.skip : 0;
newOptions.limit = options.limit ? options.limit : 0;
newOptions.raw = typeof options.raw === 'boolean' ? options.raw : this.s.raw;
newOptions.hint = options.hint != null ? normalizeHintField(options.hint) : this.s.collectionHint;
newOptions.timeout = typeof options.timeout === 'undefined' ? undefined : options.timeout;
// // If we have overridden slaveOk otherwise use the default db setting
newOptions.slaveOk = options.slaveOk != null ? options.slaveOk : this.s.db.slaveOk;
// Add read preference if needed
newOptions = getReadPreference(this, newOptions, this.s.db);
// Set slave ok to true if read preference different from primary
if (
newOptions.readPreference != null &&
(newOptions.readPreference !== 'primary' || newOptions.readPreference.mode !== 'primary')
) {
newOptions.slaveOk = true;
}
// Ensure the query is an object
if (selector != null && typeof selector !== 'object') {
throw MongoError.create({ message: 'query selector must be an object', driver: true });
}
// Build the find command
var findCommand = {
find: this.s.namespace,
limit: newOptions.limit,
skip: newOptions.skip,
query: selector
};
// Ensure we use the right await data option
if (typeof newOptions.awaitdata === 'boolean') {
newOptions.awaitData = newOptions.awaitdata;
}
// Translate to new command option noCursorTimeout
if (typeof newOptions.timeout === 'boolean') newOptions.noCursorTimeout = newOptions.timeout;
// Merge in options to command
for (var name in newOptions) {
if (newOptions[name] != null && name !== 'session') {
findCommand[name] = newOptions[name];
}
}
if (projection) findCommand.fields = projection;
// Add db object to the new options
newOptions.db = this.s.db;
// Add the promise library
newOptions.promiseLibrary = this.s.promiseLibrary;
// Set raw if available at collection level
if (newOptions.raw == null && typeof this.s.raw === 'boolean') newOptions.raw = this.s.raw;
// Set promoteLongs if available at collection level
if (newOptions.promoteLongs == null && typeof this.s.promoteLongs === 'boolean')
newOptions.promoteLongs = this.s.promoteLongs;
if (newOptions.promoteValues == null && typeof this.s.promoteValues === 'boolean')
newOptions.promoteValues = this.s.promoteValues;
if (newOptions.promoteBuffers == null && typeof this.s.promoteBuffers === 'boolean')
newOptions.promoteBuffers = this.s.promoteBuffers;
// Sort options
if (findCommand.sort) {
findCommand.sort = formattedOrderClause(findCommand.sort);
}
// Set the readConcern
decorateWithReadConcern(findCommand, this, options);
// Decorate find command with collation options
decorateWithCollation(findCommand, this, options);
// Create the cursor
if (typeof callback === 'function')
return handleCallback(
callback,
null,
this.s.topology.cursor(this.s.namespace, findCommand, newOptions)
);
return this.s.topology.cursor(this.s.namespace, findCommand, newOptions);
};
define.classMethod('find', { callback: false, promise: false, returns: [Cursor] });
/**
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
* can be overridden by setting the **forceServerObjectId** flag.
*
* @method
* @param {object} doc Document to insert.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~insertOneWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.insertOne = function(doc, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, insertOne, [this, doc, options, callback]);
};
var insertOne = function(self, doc, options, callback) {
if (Array.isArray(doc)) {
return callback(
MongoError.create({ message: 'doc parameter must be an object', driver: true })
);
}
insertDocuments(self, [doc], options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
// Workaround for pre 2.6 servers
if (r == null) return callback(null, { result: { ok: 1 } });
// Add values to top level to ensure crud spec compatibility
r.insertedCount = r.result.n;
r.insertedId = doc._id;
if (callback) callback(null, r);
});
};
var mapInserManyResults = function(docs, r) {
var finalResult = {
result: { ok: 1, n: r.insertedCount },
ops: docs,
insertedCount: r.insertedCount,
insertedIds: r.insertedIds
};
if (r.getLastOp()) {
finalResult.result.opTime = r.getLastOp();
}
return finalResult;
};
define.classMethod('insertOne', { callback: true, promise: true });
/**
* Inserts an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
* can be overridden by setting the **forceServerObjectId** flag.
*
* @method
* @param {object[]} docs Documents to insert.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {boolean} [options.ordered=true] If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~insertWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.insertMany = function(docs, options, callback) {
var self = this;
if (typeof options === 'function') (callback = options), (options = {});
options = options ? shallowClone(options) : { ordered: true };
if (!Array.isArray(docs) && typeof callback === 'function') {
return callback(
MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
);
} else if (!Array.isArray(docs)) {
return new this.s.promiseLibrary(function(resolve, reject) {
reject(
MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
);
});
}
// If keep going set unordered
options['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
// Set up the force server object id
var forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: self.s.db.options.forceServerObjectId;
// Do we want to force the server to assign the _id key
if (forceServerObjectId !== true) {
// Add _id if not specified
for (var i = 0; i < docs.length; i++) {
if (docs[i]._id == null) docs[i]._id = self.s.pkFactory.createPk();
}
}
// Generate the bulk write operations
var operations = [
{
insertMany: docs
}
];
return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback], {
resultMutator: result => mapInserManyResults(docs, result)
});
};
define.classMethod('insertMany', { callback: true, promise: true });
/**
* @typedef {Object} Collection~BulkWriteOpResult
* @property {number} insertedCount Number of documents inserted.
* @property {number} matchedCount Number of documents matched for update.
* @property {number} modifiedCount Number of documents modified.
* @property {number} deletedCount Number of documents deleted.
* @property {number} upsertedCount Number of documents upserted.
* @property {object} insertedIds Inserted document generated Id's, hash key is the index of the originating operation
* @property {object} upsertedIds Upserted document generated Id's, hash key is the index of the originating operation
* @property {object} result The command result object.
*/
/**
* The callback format for inserts
* @callback Collection~bulkWriteOpCallback
* @param {BulkWriteError} error An error instance representing the error during the execution.
* @param {Collection~BulkWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* Perform a bulkWrite operation without a fluent API
*
* Legal operation types are
*
* { insertOne: { document: { a: 1 } } }
*
* { updateOne: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
*
* { updateMany: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
*
* { deleteOne: { filter: {c:1} } }
*
* { deleteMany: { filter: {c:1} } }
*
* { replaceOne: { filter: {c:3}, replacement: {c:4}, upsert:true}}
*
* If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
* can be overridden by setting the **forceServerObjectId** flag.
*
* @method
* @param {object[]} operations Bulk operations to perform.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
* @param {boolean} [options.ordered=true] Execute write operation in ordered or unordered fashion.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~bulkWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.bulkWrite = function(operations, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || { ordered: true };
if (!Array.isArray(operations)) {
throw MongoError.create({ message: 'operations must be an array of documents', driver: true });
}
return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback]);
};
var bulkWrite = function(self, operations, options, callback) {
// Add ignoreUndfined
if (self.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = self.s.options.ignoreUndefined;
}
// Create the bulk operation
var bulk =
options.ordered === true || options.ordered == null
? self.initializeOrderedBulkOp(options)
: self.initializeUnorderedBulkOp(options);
// Do we have a collation
var collation = false;
// for each op go through and add to the bulk
try {
for (var i = 0; i < operations.length; i++) {
// Get the operation type
var key = Object.keys(operations[i])[0];
// Check if we have a collation
if (operations[i][key].collation) {
collation = true;
}
// Pass to the raw bulk
bulk.raw(operations[i]);
}
} catch (err) {
return callback(err, null);
}
// Final options for write concern
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
var writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
var capabilities = self.s.topology.capabilities();
// Did the user pass in a collation, check if our write server supports it
if (collation && capabilities && !capabilities.commandsTakeCollation) {
return callback(new MongoError(f('server/primary/mongos does not support collation')));
}
// Execute the bulk
bulk.execute(writeCon, finalOptions, function(err, r) {
// We have connection level error
if (!r && err) {
return callback(err, null);
}
r.insertedCount = r.nInserted;
r.matchedCount = r.nMatched;
r.modifiedCount = r.nModified || 0;
r.deletedCount = r.nRemoved;
r.upsertedCount = r.getUpsertedIds().length;
r.upsertedIds = {};
r.insertedIds = {};
// Update the n
r.n = r.insertedCount;
// Inserted documents
var inserted = r.getInsertedIds();
// Map inserted ids
for (var i = 0; i < inserted.length; i++) {
r.insertedIds[inserted[i].index] = inserted[i]._id;
}
// Upserted documents
var upserted = r.getUpsertedIds();
// Map upserted ids
for (i = 0; i < upserted.length; i++) {
r.upsertedIds[upserted[i].index] = upserted[i]._id;
}
// Return the results
callback(null, r);
});
};
var insertDocuments = function(self, docs, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Ensure we are operating on an array op docs
docs = Array.isArray(docs) ? docs : [docs];
// Get the write concern options
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
// If keep going set unordered
if (finalOptions.keepGoing === true) finalOptions.ordered = false;
finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
// Set up the force server object id
var forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: self.s.db.options.forceServerObjectId;
// Add _id if not specified
if (forceServerObjectId !== true) {
for (var i = 0; i < docs.length; i++) {
if (docs[i]._id === void 0) docs[i]._id = self.s.pkFactory.createPk();
}
}
// File inserts
self.s.topology.insert(self.s.namespace, docs, finalOptions, function(err, result) {
if (callback == null) return;
if (err) return handleCallback(callback, err);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors)
return handleCallback(callback, toError(result.result.writeErrors[0]));
// Add docs to the list
result.ops = docs;
// Return the results
handleCallback(callback, null, result);
});
};
define.classMethod('bulkWrite', { callback: true, promise: true });
/**
* @typedef {Object} Collection~WriteOpResult
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
* @property {object} connection The connection object used for the operation.
* @property {object} result The command result object.
*/
/**
* The callback format for inserts
* @callback Collection~writeOpCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~WriteOpResult} result The result object if the command was executed successfully.
*/
/**
* @typedef {Object} Collection~insertWriteOpResult
* @property {Number} insertedCount The total amount of documents inserted.
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
* @property {Object.<Number, ObjectId>} insertedIds Map of the index of the inserted document to the id of the inserted document.
* @property {object} connection The connection object used for the operation.
* @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
* @property {Number} result.ok Is 1 if the command executed correctly.
* @property {Number} result.n The total count of documents inserted.
*/
/**
* @typedef {Object} Collection~insertOneWriteOpResult
* @property {Number} insertedCount The total amount of documents inserted.
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
* @property {ObjectId} insertedId The driver generated ObjectId for the insert operation.
* @property {object} connection The connection object used for the operation.
* @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
* @property {Number} result.ok Is 1 if the command executed correctly.
* @property {Number} result.n The total count of documents inserted.
*/
/**
* The callback format for inserts
* @callback Collection~insertWriteOpCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~insertWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* The callback format for inserts
* @callback Collection~insertOneWriteOpCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~insertOneWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* Inserts a single document or a an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
* can be overridden by setting the **forceServerObjectId** flag.
*
* @method
* @param {(object|object[])} docs Documents to insert.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~insertWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated Use insertOne, insertMany or bulkWrite
*/
Collection.prototype.insert = function(docs, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || { ordered: false };
docs = !Array.isArray(docs) ? [docs] : docs;
if (options.keepGoing === true) {
options.ordered = false;
}
return this.insertMany(docs, options, callback);
};
define.classMethod('insert', { callback: true, promise: true });
/**
* @typedef {Object} Collection~updateWriteOpResult
* @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
* @property {Number} result.ok Is 1 if the command executed correctly.
* @property {Number} result.n The total count of documents scanned.
* @property {Number} result.nModified The total count of documents modified.
* @property {Object} connection The connection object used for the operation.
* @property {Number} matchedCount The number of documents that matched the filter.
* @property {Number} modifiedCount The number of documents that were modified.
* @property {Number} upsertedCount The number of documents upserted.
* @property {Object} upsertedId The upserted id.
* @property {ObjectId} upsertedId._id The upserted _id returned from the server.
*/
/**
* The callback format for inserts
* @callback Collection~updateWriteOpCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~updateWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* Update a single document on MongoDB
* @method
* @param {object} filter The Filter used to select the document to update
* @param {object} update The update operations to be applied to the document
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.upsert=false] Update operation is an upsert.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.updateOne = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
var err = checkForAtomicOperators(update);
if (err) {
if (typeof callback === 'function') return callback(err);
return this.s.promiseLibrary.reject(err);
}
options = shallowClone(options);
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, updateOne, [this, filter, update, options, callback]);
};
var checkForAtomicOperators = function(update) {
var keys = Object.keys(update);
// same errors as the server would give for update doc lacking atomic operators
if (keys.length === 0) {
return toError('The update operation document must contain at least one atomic operator.');
}
if (keys[0][0] !== '$') {
return toError('the update operation document must contain atomic operators.');
}
};
var updateOne = function(self, filter, update, options, callback) {
// Set single document update
options.multi = false;
// Execute update
updateDocuments(self, filter, update, options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
if (r == null) return callback(null, { result: { ok: 1 } });
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
r.upsertedId =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
? r.result.upserted[0]
: null;
r.upsertedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
r.matchedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
if (callback) callback(null, r);
});
};
define.classMethod('updateOne', { callback: true, promise: true });
/**
* Replace a document on MongoDB
* @method
* @param {object} filter The Filter used to select the document to update
* @param {object} doc The Document that replaces the matching document
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.upsert=false] Update operation is an upsert.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.replaceOne = function(filter, doc, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = shallowClone(options);
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, replaceOne, [this, filter, doc, options, callback]);
};
var replaceOne = function(self, filter, doc, options, callback) {
// Set single document update
options.multi = false;
// Execute update
updateDocuments(self, filter, doc, options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
if (r == null) return callback(null, { result: { ok: 1 } });
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
r.upsertedId =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
? r.result.upserted[0]
: null;
r.upsertedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
r.matchedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
r.ops = [doc];
if (callback) callback(null, r);
});
};
define.classMethod('replaceOne', { callback: true, promise: true });
/**
* Update multiple documents on MongoDB
* @method
* @param {object} filter The Filter used to select the documents to update
* @param {object} update The update operations to be applied to the document
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.upsert=false] Update operation is an upsert.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.updateMany = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
var err = checkForAtomicOperators(update);
if (err) {
if (typeof callback === 'function') return callback(err);
return this.s.promiseLibrary.reject(err);
}
options = shallowClone(options);
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, updateMany, [this, filter, update, options, callback]);
};
var updateMany = function(self, filter, update, options, callback) {
// Set single document update
options.multi = true;
// Execute update
updateDocuments(self, filter, update, options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
if (r == null) return callback(null, { result: { ok: 1 } });
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
r.upsertedId =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
? r.result.upserted[0]
: null;
r.upsertedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
r.matchedCount =
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
if (callback) callback(null, r);
});
};
define.classMethod('updateMany', { callback: true, promise: true });
var updateDocuments = function(self, selector, document, options, callback) {
if ('function' === typeof options) (callback = options), (options = null);
if (options == null) options = {};
if (!('function' === typeof callback)) callback = null;
// If we are not providing a selector or document throw
if (selector == null || typeof selector !== 'object')
return callback(toError('selector must be a valid JavaScript object'));
if (document == null || typeof document !== 'object')
return callback(toError('document must be a valid JavaScript object'));
// Get the write concern options
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
// Do we return the actual result document
// Either use override on the function, or go back to default on either the collection
// level or db
finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
// Execute the operation
var op = { q: selector, u: document };
op.upsert = options.upsert !== void 0 ? !!options.upsert : false;
op.multi = options.multi !== void 0 ? !!options.multi : false;
if (finalOptions.arrayFilters) {
op.arrayFilters = finalOptions.arrayFilters;
delete finalOptions.arrayFilters;
}
// Have we specified collation
decorateWithCollation(finalOptions, self, options);
// Update options
self.s.topology.update(self.s.namespace, [op], finalOptions, function(err, result) {
if (callback == null) return;
if (err) return handleCallback(callback, err, null);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors)
return handleCallback(callback, toError(result.result.writeErrors[0]));
// Return the results
handleCallback(callback, null, result);
});
};
/**
* Updates documents.
* @method
* @param {object} selector The selector for the update operation.
* @param {object} document The update document.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.upsert=false] Update operation is an upsert.
* @param {boolean} [options.multi=false] Update one/all documents with operation.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~writeOpCallback} [callback] The command result callback
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
* @deprecated use updateOne, updateMany or bulkWrite
*/
Collection.prototype.update = function(selector, document, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, updateDocuments, [
this,
selector,
document,
options,
callback
]);
};
define.classMethod('update', { callback: true, promise: true });
/**
* @typedef {Object} Collection~deleteWriteOpResult
* @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
* @property {Number} result.ok Is 1 if the command executed correctly.
* @property {Number} result.n The total count of documents deleted.
* @property {Object} connection The connection object used for the operation.
* @property {Number} deletedCount The number of documents deleted.
*/
/**
* The callback format for inserts
* @callback Collection~deleteWriteOpCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~deleteWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* Delete a document on MongoDB
* @method
* @param {object} filter The Filter used to select the document to remove
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~deleteWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.deleteOne = function(filter, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = shallowClone(options);
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, deleteOne, [this, filter, options, callback]);
};
var deleteOne = function(self, filter, options, callback) {
options.single = true;
removeDocuments(self, filter, options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
if (r == null) return callback(null, { result: { ok: 1 } });
r.deletedCount = r.result.n;
if (callback) callback(null, r);
});
};
define.classMethod('deleteOne', { callback: true, promise: true });
Collection.prototype.removeOne = Collection.prototype.deleteOne;
define.classMethod('removeOne', { callback: true, promise: true });
/**
* Delete multiple documents on MongoDB
* @method
* @param {object} filter The Filter used to select the documents to remove
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~deleteWriteOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.deleteMany = function(filter, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = shallowClone(options);
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, deleteMany, [this, filter, options, callback]);
};
var deleteMany = function(self, filter, options, callback) {
options.single = false;
removeDocuments(self, filter, options, function(err, r) {
if (callback == null) return;
if (err && callback) return callback(err);
if (r == null) return callback(null, { result: { ok: 1 } });
r.deletedCount = r.result.n;
if (callback) callback(null, r);
});
};
var removeDocuments = function(self, selector, options, callback) {
if (typeof options === 'function') {
(callback = options), (options = {});
} else if (typeof selector === 'function') {
callback = selector;
options = {};
selector = {};
}
// Create an empty options object if the provided one is null
options = options || {};
// Get the write concern options
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
// If selector is null set empty
if (selector == null) selector = {};
// Build the op
var op = { q: selector, limit: 0 };
if (options.single) op.limit = 1;
// Have we specified collation
decorateWithCollation(finalOptions, self, options);
// Execute the remove
self.s.topology.remove(self.s.namespace, [op], finalOptions, function(err, result) {
if (callback == null) return;
if (err) return handleCallback(callback, err, null);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors)
return handleCallback(callback, toError(result.result.writeErrors[0]));
// Return the results
handleCallback(callback, null, result);
});
};
define.classMethod('deleteMany', { callback: true, promise: true });
Collection.prototype.removeMany = Collection.prototype.deleteMany;
define.classMethod('removeMany', { callback: true, promise: true });
/**
* Remove documents.
* @method
* @param {object} selector The selector for the update operation.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.single=false] Removes the first document found.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~writeOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated use deleteOne, deleteMany or bulkWrite
*/
Collection.prototype.remove = function(selector, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, removeDocuments, [this, selector, options, callback]);
};
define.classMethod('remove', { callback: true, promise: true });
/**
* Save a document. Simple full document replacement function. Not recommended for efficiency, use atomic
* operators and update instead for more efficient operations.
* @method
* @param {object} doc Document to save
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~writeOpCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated use insertOne, insertMany, updateOne or updateMany
*/
Collection.prototype.save = function(doc, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
return executeOperation(this.s.topology, save, [this, doc, options, callback]);
};
var save = function(self, doc, options, callback) {
// Get the write concern options
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
// Establish if we need to perform an insert or update
if (doc._id != null) {
finalOptions.upsert = true;
return updateDocuments(self, { _id: doc._id }, doc, finalOptions, callback);
}
// Insert the document
insertDocuments(self, [doc], finalOptions, function(err, r) {
if (callback == null) return;
if (doc == null) return handleCallback(callback, null, null);
if (err) return handleCallback(callback, err, null);
handleCallback(callback, null, r);
});
};
define.classMethod('save', { callback: true, promise: true });
/**
* The callback format for results
* @callback Collection~resultCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {object} result The result object if the command was executed successfully.
*/
/**
* The callback format for an aggregation call
* @callback Collection~aggregationCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully.
*/
/**
* Fetches the first document that matches the query
* @method
* @param {object} query Query for find Operation
* @param {object} [options=null] Optional settings.
* @param {number} [options.limit=0] Sets the limit of documents returned in the query.
* @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
* @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
* @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
* @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
* @param {boolean} [options.explain=false] Explain the query instead of returning the data.
* @param {boolean} [options.snapshot=false] Snapshot query.
* @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
* @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
* @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
* @param {boolean} [options.returnKey=false] Only return the index key.
* @param {number} [options.maxScan=null] Limit the number of items to scan.
* @param {number} [options.min=null] Set index bounds.
* @param {number} [options.max=null] Set index bounds.
* @param {boolean} [options.showDiskLoc=false] Show disk location of results.
* @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.findOne = function(query, options, callback) {
if (typeof query === 'function') (callback = query), (query = {}), (options = {});
if (typeof options === 'function') (callback = options), (options = {});
query = query || {};
options = options || {};
return executeOperation(this.s.topology, findOne, [this, query, options, callback]);
};
var findOne = function(self, query, options, callback) {
const cursor = self
.find(query, options)
.limit(-1)
.batchSize(1);
// Return the item
cursor.next(function(err, item) {
if (err != null) return handleCallback(callback, toError(err), null);
handleCallback(callback, null, item);
});
};
define.classMethod('findOne', { callback: true, promise: true });
/**
* The callback format for the collection method, must be used if strict is specified
* @callback Collection~collectionResultCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection} collection The collection instance.
*/
/**
* Rename the collection.
*
* @method
* @param {string} newName New name of of the collection.
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.dropTarget=false] Drop the target name collection if it previously exists.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~collectionResultCallback} [callback] The results callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.rename = function(newName, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
return executeOperation(this.s.topology, rename, [this, newName, options, callback]);
};
var rename = function(self, newName, options, callback) {
// Check the collection name
checkCollectionName(newName);
// Build the command
var renameCollection = f('%s.%s', self.s.dbName, self.s.name);
var toCollection = f('%s.%s', self.s.dbName, newName);
var dropTarget = typeof options.dropTarget === 'boolean' ? options.dropTarget : false;
var cmd = { renameCollection: renameCollection, to: toCollection, dropTarget: dropTarget };
// Decorate command with writeConcern if supported
decorateWithWriteConcern(cmd, self, options);
// Execute against admin
self.s.db.admin().command(cmd, options, function(err, doc) {
if (err) return handleCallback(callback, err, null);
// We have an error
if (doc.errmsg) return handleCallback(callback, toError(doc), null);
try {
return handleCallback(
callback,
null,
new Collection(
self.s.db,
self.s.topology,
self.s.dbName,
newName,
self.s.pkFactory,
self.s.options
)
);
} catch (err) {
return handleCallback(callback, toError(err), null);
}
});
};
define.classMethod('rename', { callback: true, promise: true });
/**
* Drop the collection from the database, removing it permanently. New accesses will create a new collection.
*
* @method
* @param {object} [options=null] Optional settings.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The results callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.drop = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, this.s.db.dropCollection.bind(this.s.db), [
this.s.name,
options,
callback
]);
};
define.classMethod('drop', { callback: true, promise: true });
/**
* Returns the options of the collection.
*
* @method
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The results callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.options = function(opts, callback) {
if (typeof opts === 'function') (callback = opts), (opts = {});
opts = opts || {};
return executeOperation(this.s.topology, options, [this, opts, callback]);
};
var options = function(self, opts, callback) {
self.s.db.listCollections({ name: self.s.name }, opts).toArray(function(err, collections) {
if (err) return handleCallback(callback, err);
if (collections.length === 0) {
return handleCallback(
callback,
MongoError.create({ message: f('collection %s not found', self.s.namespace), driver: true })
);
}
handleCallback(callback, err, collections[0].options || null);
});
};
define.classMethod('options', { callback: true, promise: true });
/**
* Returns if the collection is a capped collection
*
* @method
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The results callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.isCapped = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, isCapped, [this, options, callback]);
};
var isCapped = function(self, options, callback) {
self.options(options, function(err, document) {
if (err) return handleCallback(callback, err);
handleCallback(callback, null, document && document.capped);
});
};
define.classMethod('isCapped', { callback: true, promise: true });
/**
* Creates an index on the db and collection collection.
* @method
* @param {(string|object)} fieldOrSpec Defines the index.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.unique=false] Creates an unique index.
* @param {boolean} [options.sparse=false] Creates a sparse index.
* @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
* @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
* @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
* @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
* @param {number} [options.v=null] Specify the format version of the indexes.
* @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
* @param {string} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
* @param {object} [options.partialFilterExpression=null] Creates a partial index based on the given filter object (MongoDB 3.2 or higher)
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.createIndex = function(fieldOrSpec, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, createIndex, [this, fieldOrSpec, options, callback]);
};
var createIndex = function(self, fieldOrSpec, options, callback) {
self.s.db.createIndex(self.s.name, fieldOrSpec, options, callback);
};
define.classMethod('createIndex', { callback: true, promise: true });
/**
* Creates multiple indexes in the collection, this method is only supported for
* MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported
* error. Index specifications are defined at http://docs.mongodb.org/manual/reference/command/createIndexes/.
* @method
* @param {array} indexSpecs An array of index specifications to be created
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.createIndexes = function(indexSpecs, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options ? shallowClone(options) : {};
if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
return executeOperation(this.s.topology, createIndexes, [this, indexSpecs, options, callback]);
};
var createIndexes = function(self, indexSpecs, options, callback) {
var capabilities = self.s.topology.capabilities();
// Ensure we generate the correct name if the parameter is not set
for (var i = 0; i < indexSpecs.length; i++) {
if (indexSpecs[i].name == null) {
var keys = [];
// Did the user pass in a collation, check if our write server supports it
if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) {
return callback(new MongoError(f('server/primary/mongos does not support collation')));
}
for (var name in indexSpecs[i].key) {
keys.push(f('%s_%s', name, indexSpecs[i].key[name]));
}
// Set the name
indexSpecs[i].name = keys.join('_');
}
}
options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
// Execute the index
self.s.db.command(
{
createIndexes: self.s.name,
indexes: indexSpecs
},
options,
callback
);
};
define.classMethod('createIndexes', { callback: true, promise: true });
/**
* Drops an index from this collection.
* @method
* @param {string} indexName Name of the index to drop.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.dropIndex = function(indexName, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
options = args.length ? args.shift() || {} : {};
// Run only against primary
options.readPreference = ReadPreference.PRIMARY;
return executeOperation(this.s.topology, dropIndex, [this, indexName, options, callback]);
};
var dropIndex = function(self, indexName, options, callback) {
// Delete index command
var cmd = { dropIndexes: self.s.name, index: indexName };
// Decorate command with writeConcern if supported
decorateWithWriteConcern(cmd, self, options);
// Execute command
self.s.db.command(cmd, options, function(err, result) {
if (typeof callback !== 'function') return;
if (err) return handleCallback(callback, err, null);
handleCallback(callback, null, result);
});
};
define.classMethod('dropIndex', { callback: true, promise: true });
/**
* Drops all indexes from this collection.
* @method
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.dropIndexes = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options ? shallowClone(options) : {};
if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
return executeOperation(this.s.topology, dropIndexes, [this, options, callback]);
};
var dropIndexes = function(self, options, callback) {
self.dropIndex('*', options, function(err) {
if (err) return handleCallback(callback, err, false);
handleCallback(callback, null, true);
});
};
define.classMethod('dropIndexes', { callback: true, promise: true });
/**
* Drops all indexes from this collection.
* @method
* @deprecated use dropIndexes
* @param {Collection~resultCallback} callback The command result callback
* @return {Promise} returns Promise if no [callback] passed
*/
Collection.prototype.dropAllIndexes = Collection.prototype.dropIndexes;
define.classMethod('dropAllIndexes', { callback: true, promise: true });
/**
* Reindex all indexes on the collection
* Warning: reIndex is a blocking operation (indexes are rebuilt in the foreground) and will be slow for large collections.
* @method
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.reIndex = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, reIndex, [this, options, callback]);
};
var reIndex = function(self, options, callback) {
// Reindex
var cmd = { reIndex: self.s.name };
// Execute the command
self.s.db.command(cmd, options, function(err, result) {
if (callback == null) return;
if (err) return handleCallback(callback, err, null);
handleCallback(callback, null, result.ok ? true : false);
});
};
define.classMethod('reIndex', { callback: true, promise: true });
/**
* Get the list of all indexes information for the collection.
*
* @method
* @param {object} [options=null] Optional settings.
* @param {number} [options.batchSize=null] The batchSize for the returned command cursor or if pre 2.8 the systems batch collection
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {CommandCursor}
*/
Collection.prototype.listIndexes = function(options) {
options = options || {};
// Clone the options
options = shallowClone(options);
// Determine the read preference in the options.
options = getReadPreference(this, options, this.s.db, this);
// Set the CommandCursor constructor
options.cursorFactory = CommandCursor;
// Set the promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;
if (!this.s.topology.capabilities()) {
throw new MongoError('cannot connect to server');
}
// We have a list collections command
if (this.s.topology.capabilities().hasListIndexesCommand) {
// Cursor options
var cursor = options.batchSize ? { batchSize: options.batchSize } : {};
// Build the command
var command = { listIndexes: this.s.name, cursor: cursor };
// Execute the cursor
cursor = this.s.topology.cursor(f('%s.$cmd', this.s.dbName), command, options);
// Do we have a readPreference, apply it
if (options.readPreference) cursor.setReadPreference(options.readPreference);
// Return the cursor
return cursor;
}
// Get the namespace
var ns = f('%s.system.indexes', this.s.dbName);
// Get the query
cursor = this.s.topology.cursor(ns, { find: ns, query: { ns: this.s.namespace } }, options);
// Do we have a readPreference, apply it
if (options.readPreference) cursor.setReadPreference(options.readPreference);
// Set the passed in batch size if one was provided
if (options.batchSize) cursor = cursor.batchSize(options.batchSize);
// Return the cursor
return cursor;
};
define.classMethod('listIndexes', { callback: false, promise: false, returns: [CommandCursor] });
/**
* Ensures that an index exists, if it does not it creates it
* @method
* @deprecated use createIndexes instead
* @param {(string|object)} fieldOrSpec Defines the index.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.unique=false] Creates an unique index.
* @param {boolean} [options.sparse=false] Creates a sparse index.
* @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
* @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
* @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
* @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
* @param {number} [options.v=null] Specify the format version of the indexes.
* @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
* @param {number} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.ensureIndex = function(fieldOrSpec, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, ensureIndex, [this, fieldOrSpec, options, callback]);
};
var ensureIndex = function(self, fieldOrSpec, options, callback) {
self.s.db.ensureIndex(self.s.name, fieldOrSpec, options, callback);
};
define.classMethod('ensureIndex', { callback: true, promise: true });
/**
* Checks if one or more indexes exist on the collection, fails on first non-existing index
* @method
* @param {(string|array)} indexes One or more index names to check.
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.indexExists = function(indexes, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, indexExists, [this, indexes, options, callback]);
};
var indexExists = function(self, indexes, options, callback) {
self.indexInformation(options, function(err, indexInformation) {
// If we have an error return
if (err != null) return handleCallback(callback, err, null);
// Let's check for the index names
if (!Array.isArray(indexes))
return handleCallback(callback, null, indexInformation[indexes] != null);
// Check in list of indexes
for (var i = 0; i < indexes.length; i++) {
if (indexInformation[indexes[i]] == null) {
return handleCallback(callback, null, false);
}
}
// All keys found return true
return handleCallback(callback, null, true);
});
};
define.classMethod('indexExists', { callback: true, promise: true });
/**
* Retrieves this collections index info.
* @method
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.full=false] Returns the full raw index information.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.indexInformation = function(options, callback) {
var args = Array.prototype.slice.call(arguments, 0);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
options = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, indexInformation, [this, options, callback]);
};
var indexInformation = function(self, options, callback) {
self.s.db.indexInformation(self.s.name, options, callback);
};
define.classMethod('indexInformation', { callback: true, promise: true });
/**
* The callback format for results
* @callback Collection~countCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {number} result The count of documents that matched the query.
*/
/**
* Count number of matching documents in the db to a query.
* @method
* @param {object} query The query for the count.
* @param {object} [options=null] Optional settings.
* @param {boolean} [options.limit=null] The limit of documents to count.
* @param {boolean} [options.skip=null] The number of documents to skip for the count.
* @param {string} [options.hint=null] An index name hint for the query.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~countCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.count = function(query, options, callback) {
var args = Array.prototype.slice.call(arguments, 0);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
query = args.length ? args.shift() || {} : {};
options = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, count, [this, query, options, callback]);
};
var count = function(self, query, options, callback) {
var skip = options.skip;
var limit = options.limit;
var hint = options.hint;
var maxTimeMS = options.maxTimeMS;
// Final query
var cmd = {
count: self.s.name,
query: query
};
// Add limit, skip and maxTimeMS if defined
if (typeof skip === 'number') cmd.skip = skip;
if (typeof limit === 'number') cmd.limit = limit;
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
if (hint) cmd.hint = hint;
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db);
// Do we have a readConcern specified
decorateWithReadConcern(cmd, self, options);
// Have we specified collation
decorateWithCollation(cmd, self, options);
// Execute command
self.s.db.command(cmd, options, function(err, result) {
if (err) return handleCallback(callback, err);
handleCallback(callback, null, result.n);
});
};
define.classMethod('count', { callback: true, promise: true });
/**
* The distinct command returns returns a list of distinct values for the given key across a collection.
* @method
* @param {string} key Field of the document to find distinct values for.
* @param {object} query The query for filtering the set of documents to which we apply the distinct filter.
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.distinct = function(key, query, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
var queryOption = args.length ? args.shift() || {} : {};
var optionsOption = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, distinct, [
this,
key,
queryOption,
optionsOption,
callback
]);
};
var distinct = function(self, key, query, options, callback) {
// maxTimeMS option
var maxTimeMS = options.maxTimeMS;
// Distinct command
var cmd = {
distinct: self.s.name,
key: key,
query: query
};
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
// Add maxTimeMS if defined
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
// Do we have a readConcern specified
decorateWithReadConcern(cmd, self, options);
// Have we specified collation
decorateWithCollation(cmd, self, options);
// Execute the command
self.s.db.command(cmd, options, function(err, result) {
if (err) return handleCallback(callback, err);
handleCallback(callback, null, result.values);
});
};
define.classMethod('distinct', { callback: true, promise: true });
/**
* Retrieve all the indexes on the collection.
* @method
* @param {Object} [options] Optional settings
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.indexes = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
return executeOperation(this.s.topology, indexes, [this, options, callback]);
};
var indexes = function(self, options, callback) {
options = Object.assign({}, { full: true }, options);
self.s.db.indexInformation(self.s.name, options, callback);
};
define.classMethod('indexes', { callback: true, promise: true });
/**
* Get all the collection statistics.
*
* @method
* @param {object} [options=null] Optional settings.
* @param {number} [options.scale=null] Divide the returned sizes by scale value.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The collection result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.stats = function(options, callback) {
var args = Array.prototype.slice.call(arguments, 0);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
options = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, stats, [this, options, callback]);
};
var stats = function(self, options, callback) {
// Build command object
var commandObject = {
collStats: self.s.name
};
// Check if we have the scale value
if (options['scale'] != null) commandObject['scale'] = options['scale'];
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
// Execute the command
self.s.db.command(commandObject, options, callback);
};
define.classMethod('stats', { callback: true, promise: true });
/**
* @typedef {Object} Collection~findAndModifyWriteOpResult
* @property {object} value Document returned from findAndModify command.
* @property {object} lastErrorObject The raw lastErrorObject returned from the command.
* @property {Number} ok Is 1 if the command executed correctly.
*/
/**
* The callback format for inserts
* @callback Collection~findAndModifyCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Collection~findAndModifyWriteOpResult} result The result object if the command was executed successfully.
*/
/**
* Find a document and delete it in one atomic operation, requires a write lock for the duration of the operation.
*
* @method
* @param {object} filter Document selection filter.
* @param {object} [options=null] Optional settings.
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.findOneAndDelete = function(filter, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Basic validation
if (filter == null || typeof filter !== 'object')
throw toError('filter parameter must be an object');
return executeOperation(this.s.topology, findOneAndDelete, [this, filter, options, callback]);
};
var findOneAndDelete = function(self, filter, options, callback) {
// Final options
var finalOptions = shallowClone(options);
finalOptions['fields'] = options.projection;
finalOptions['remove'] = true;
// Execute find and Modify
self.findAndModify(filter, options.sort, null, finalOptions, callback);
};
define.classMethod('findOneAndDelete', { callback: true, promise: true });
/**
* Find a document and replace it in one atomic operation, requires a write lock for the duration of the operation.
*
* @method
* @param {object} filter Document selection filter.
* @param {object} replacement Document replacing the matching document.
* @param {object} [options=null] Optional settings.
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
* @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
* @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.findOneAndReplace = function(filter, replacement, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Basic validation
if (filter == null || typeof filter !== 'object')
throw toError('filter parameter must be an object');
if (replacement == null || typeof replacement !== 'object')
throw toError('replacement parameter must be an object');
return executeOperation(this.s.topology, findOneAndReplace, [
this,
filter,
replacement,
options,
callback
]);
};
var findOneAndReplace = function(self, filter, replacement, options, callback) {
// Final options
var finalOptions = shallowClone(options);
finalOptions['fields'] = options.projection;
finalOptions['update'] = true;
finalOptions['new'] = options.returnOriginal !== void 0 ? !options.returnOriginal : false;
finalOptions['upsert'] = options.upsert !== void 0 ? !!options.upsert : false;
// Execute findAndModify
self.findAndModify(filter, options.sort, replacement, finalOptions, callback);
};
define.classMethod('findOneAndReplace', { callback: true, promise: true });
/**
* Find a document and update it in one atomic operation, requires a write lock for the duration of the operation.
*
* @method
* @param {object} filter Document selection filter.
* @param {object} update Update operations to be performed on the document
* @param {object} [options=null] Optional settings.
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
* @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
* @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.findOneAndUpdate = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Basic validation
if (filter == null || typeof filter !== 'object')
throw toError('filter parameter must be an object');
if (update == null || typeof update !== 'object')
throw toError('update parameter must be an object');
return executeOperation(this.s.topology, findOneAndUpdate, [
this,
filter,
update,
options,
callback
]);
};
var findOneAndUpdate = function(self, filter, update, options, callback) {
// Final options
var finalOptions = shallowClone(options);
finalOptions['fields'] = options.projection;
finalOptions['update'] = true;
finalOptions['new'] =
typeof options.returnOriginal === 'boolean' ? !options.returnOriginal : false;
finalOptions['upsert'] = typeof options.upsert === 'boolean' ? options.upsert : false;
// Execute findAndModify
self.findAndModify(filter, options.sort, update, finalOptions, callback);
};
define.classMethod('findOneAndUpdate', { callback: true, promise: true });
/**
* Find and update a document.
* @method
* @param {object} query Query object to locate the object to modify.
* @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
* @param {object} doc The fields/vals to be updated.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {boolean} [options.remove=false] Set to true to remove the object before returning.
* @param {boolean} [options.upsert=false] Perform an upsert operation.
* @param {boolean} [options.new=false] Set to true if you want to return the modified object rather than the original. Ignored for remove.
* @param {object} [options.projection=null] Object containing the field projection for the result returned from the operation.
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~findAndModifyCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated use findOneAndUpdate, findOneAndReplace or findOneAndDelete instead
*/
Collection.prototype.findAndModify = function(query, sort, doc, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
sort = args.length ? args.shift() || [] : [];
doc = args.length ? args.shift() : null;
options = args.length ? args.shift() || {} : {};
// Clone options
options = shallowClone(options);
// Force read preference primary
options.readPreference = ReadPreference.PRIMARY;
return executeOperation(this.s.topology, findAndModify, [
this,
query,
sort,
doc,
options,
callback
]);
};
var findAndModify = function(self, query, sort, doc, options, callback) {
// Create findAndModify command object
var queryObject = {
findandmodify: self.s.name,
query: query
};
sort = formattedOrderClause(sort);
if (sort) {
queryObject.sort = sort;
}
queryObject.new = options.new ? true : false;
queryObject.remove = options.remove ? true : false;
queryObject.upsert = options.upsert ? true : false;
const projection = options.projection || options.fields;
if (projection) {
queryObject.fields = projection;
}
if (options.arrayFilters) {
queryObject.arrayFilters = options.arrayFilters;
delete options.arrayFilters;
}
if (doc && !options.remove) {
queryObject.update = doc;
}
if (options.maxTimeMS) queryObject.maxTimeMS = options.maxTimeMS;
// Either use override on the function, or go back to default on either the collection
// level or db
if (options['serializeFunctions'] != null) {
options['serializeFunctions'] = options['serializeFunctions'];
} else {
options['serializeFunctions'] = self.s.serializeFunctions;
}
// No check on the documents
options.checkKeys = false;
// Get the write concern settings
var finalOptions = writeConcern(options, self.s.db, self, options);
// Decorate the findAndModify command with the write Concern
if (finalOptions.writeConcern) {
queryObject.writeConcern = finalOptions.writeConcern;
}
// Have we specified bypassDocumentValidation
if (typeof finalOptions.bypassDocumentValidation === 'boolean') {
queryObject.bypassDocumentValidation = finalOptions.bypassDocumentValidation;
}
// Have we specified collation
decorateWithCollation(queryObject, self, finalOptions);
// Execute the command
self.s.db.command(queryObject, finalOptions, function(err, result) {
if (err) return handleCallback(callback, err, null);
return handleCallback(callback, null, result);
});
};
define.classMethod('findAndModify', { callback: true, promise: true });
/**
* Find and remove a document.
* @method
* @param {object} query Query object to locate the object to modify.
* @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated use findOneAndDelete instead
*/
Collection.prototype.findAndRemove = function(query, sort, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
sort = args.length ? args.shift() || [] : [];
options = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, findAndRemove, [this, query, sort, options, callback]);
};
var findAndRemove = function(self, query, sort, options, callback) {
// Add the remove option
options['remove'] = true;
// Execute the callback
self.findAndModify(query, sort, null, options, callback);
};
define.classMethod('findAndRemove', { callback: true, promise: true });
function decorateWithWriteConcern(command, self, options) {
// Do we support collation 3.4 and higher
var capabilities = self.s.topology.capabilities();
// Do we support write concerns 3.4 and higher
if (capabilities && capabilities.commandsTakeWriteConcern) {
// Get the write concern settings
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
// Add the write concern to the command
if (finalOptions.writeConcern) {
command.writeConcern = finalOptions.writeConcern;
}
}
}
function decorateWithCollation(command, self, options) {
// Do we support collation 3.4 and higher
var capabilities = self.s.topology.capabilities();
// Do we support write concerns 3.4 and higher
if (capabilities && capabilities.commandsTakeCollation) {
if (options.collation && typeof options.collation === 'object') {
command.collation = options.collation;
}
}
}
function decorateWithReadConcern(command, self, options) {
let readConcern = Object.assign({}, command.readConcern || {});
if (self.s.readConcern) {
Object.assign(readConcern, self.s.readConcern);
}
if (
options.session &&
options.session.supports.causalConsistency &&
options.session.operationTime
) {
Object.assign(readConcern, { afterClusterTime: options.session.operationTime });
}
if (Object.keys(readConcern).length > 0) {
Object.assign(command, { readConcern: readConcern });
}
}
/**
* Execute an aggregation framework pipeline against the collection, needs MongoDB >= 2.2
* @method
* @param {object} pipeline Array containing all the aggregation framework commands for the execution.
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {object} [options.cursor=null] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor.
* @param {number} [options.cursor.batchSize=null] The batchSize for the cursor
* @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >).
* @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >).
* @param {number} [options.maxTimeMS=null] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {string} [options.comment] Add a comment to an aggregation command
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~aggregationCallback} callback The command result callback
* @return {(null|AggregationCursor)}
*/
Collection.prototype.aggregate = function(pipeline, options, callback) {
var self = this;
if (Array.isArray(pipeline)) {
// Set up callback if one is provided
if (typeof options === 'function') {
callback = options;
options = {};
}
// If we have no options or callback we are doing
// a cursor based aggregation
if (options == null && callback == null) {
options = {};
}
} else {
// Aggregation pipeline passed as arguments on the method
var args = Array.prototype.slice.call(arguments, 0);
// Get the callback
callback = args.pop();
// Get the possible options object
var opts = args[args.length - 1];
// If it contains any of the admissible options pop it of the args
options =
opts &&
(opts.readPreference ||
opts.explain ||
opts.cursor ||
opts.out ||
opts.maxTimeMS ||
opts.hint ||
opts.allowDiskUse)
? args.pop()
: {};
// Left over arguments is the pipeline
pipeline = args;
}
// Ignore readConcern option
var ignoreReadConcern = false;
// Build the command
var command = { aggregate: this.s.name, pipeline: pipeline };
// If out was specified
if (typeof options.out === 'string') {
pipeline.push({ $out: options.out });
// Ignore read concern
ignoreReadConcern = true;
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
ignoreReadConcern = true;
}
// Decorate command with writeConcern if out has been specified
if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
decorateWithWriteConcern(command, self, options);
}
// Have we specified collation
decorateWithCollation(command, self, options);
// If we have bypassDocumentValidation set
if (typeof options.bypassDocumentValidation === 'boolean') {
command.bypassDocumentValidation = options.bypassDocumentValidation;
}
// Do we have a readConcern specified
if (!ignoreReadConcern) {
decorateWithReadConcern(command, self, options);
}
// If we have allowDiskUse defined
if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
// If we are giving a hint
if (options.hint) command.hint = options.hint;
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
// If explain has been specified add it
if (options.explain) {
if (command.readConcern || command.writeConcern) {
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
}
command.explain = options.explain;
}
if (typeof options.comment === 'string') command.comment = options.comment;
// Validate that cursor options is valid
if (options.cursor != null && typeof options.cursor !== 'object') {
throw toError('cursor options must be an object');
}
options.cursor = options.cursor || { batchSize: 1000 };
command.cursor = options.cursor;
// promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;
// Set the AggregationCursor constructor
options.cursorFactory = AggregationCursor;
if (typeof callback !== 'function') {
if (!this.s.topology.capabilities()) {
throw new MongoError('cannot connect to server');
}
// Allow disk usage command
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
// Execute the cursor
return this.s.topology.cursor(this.s.namespace, command, options);
}
return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options));
};
define.classMethod('aggregate', { callback: true, promise: false });
/**
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
* @method
* @since 3.0.0
* @param {Array} [pipeline=null] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param {object} [options=null] Optional settings
* @param {string} [options.fullDocument='default'] Allowed values: default, updateLookup. When set to updateLookup, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
* @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {ChangeStream} a ChangeStream instance.
*/
Collection.prototype.watch = function(pipeline, options) {
pipeline = pipeline || [];
options = options || {};
// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}
return new ChangeStream(this, pipeline, options);
};
define.classMethod('watch', { callback: false, promise: false });
/**
* The callback format for results
* @callback Collection~parallelCollectionScanCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Cursor[]} cursors A list of cursors returned allowing for parallel reading of collection.
*/
/**
* Return N number of parallel cursors for a collection allowing parallel reading of entire collection. There are
* no ordering guarantees for returned results.
* @method
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {number} [options.batchSize=null] Set the batchSize for the getMoreCommand when iterating over the query results.
* @param {number} [options.numCursors=1] The maximum number of parallel command cursors to return (the number of returned cursors will be in the range 1:numCursors)
* @param {boolean} [options.raw=false] Return all BSON documents as Raw Buffer documents.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~parallelCollectionScanCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.parallelCollectionScan = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = { numCursors: 1 });
// Set number of cursors to 1
options.numCursors = options.numCursors || 1;
options.batchSize = options.batchSize || 1000;
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
// Add a promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;
return executeOperation(this.s.topology, parallelCollectionScan, [this, options, callback], {
returnsCursor: true
});
};
var parallelCollectionScan = function(self, options, callback) {
// Create command object
var commandObject = {
parallelCollectionScan: self.s.name,
numCursors: options.numCursors
};
// Do we have a readConcern specified
decorateWithReadConcern(commandObject, self, options);
// Store the raw value
var raw = options.raw;
delete options['raw'];
// Execute the command
self.s.db.command(commandObject, options, function(err, result) {
if (err) return handleCallback(callback, err, null);
if (result == null)
return handleCallback(
callback,
new Error('no result returned for parallelCollectionScan'),
null
);
var cursors = [];
// Add the raw back to the option
if (raw) options.raw = raw;
// Create command cursors for each item
for (var i = 0; i < result.cursors.length; i++) {
var rawId = result.cursors[i].cursor.id;
// Convert cursorId to Long if needed
var cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId;
// Add a command cursor
cursors.push(self.s.topology.cursor(self.s.namespace, cursorId, options));
}
handleCallback(callback, null, cursors);
});
};
define.classMethod('parallelCollectionScan', { callback: true, promise: true });
/**
* Execute a geo search using a geo haystack index on a collection.
*
* @method
* @param {number} x Point to search on the x axis, ensure the indexes are ordered in the same order.
* @param {number} y Point to search on the y axis, ensure the indexes are ordered in the same order.
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {number} [options.maxDistance=null] Include results up to maxDistance from the point.
* @param {object} [options.search=null] Filter the results by a query.
* @param {number} [options.limit=false] Max number of results to return.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.geoHaystackSearch = function(x, y, options, callback) {
var args = Array.prototype.slice.call(arguments, 2);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
options = args.length ? args.shift() || {} : {};
return executeOperation(this.s.topology, geoHaystackSearch, [this, x, y, options, callback]);
};
var geoHaystackSearch = function(self, x, y, options, callback) {
// Build command object
var commandObject = {
geoSearch: self.s.name,
near: [x, y]
};
// Remove read preference from hash if it exists
commandObject = decorateCommand(commandObject, options, { readPreference: true, session: true });
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
// Do we have a readConcern specified
decorateWithReadConcern(commandObject, self, options);
// Execute the command
self.s.db.command(commandObject, options, function(err, res) {
if (err) return handleCallback(callback, err);
if (res.err || res.errmsg) handleCallback(callback, toError(res));
// should we only be returning res.results here? Not sure if the user
// should see the other return information
handleCallback(callback, null, res);
});
};
define.classMethod('geoHaystackSearch', { callback: true, promise: true });
/**
* Group function helper
* @ignore
*/
// var groupFunction = function () {
// var c = db[ns].find(condition);
// var map = new Map();
// var reduce_function = reduce;
//
// while (c.hasNext()) {
// var obj = c.next();
// var key = {};
//
// for (var i = 0, len = keys.length; i < len; ++i) {
// var k = keys[i];
// key[k] = obj[k];
// }
//
// var aggObj = map.get(key);
//
// if (aggObj == null) {
// var newObj = Object.extend({}, key);
// aggObj = Object.extend(newObj, initial);
// map.put(key, aggObj);
// }
//
// reduce_function(obj, aggObj);
// }
//
// return { "result": map.values() };
// }.toString();
var groupFunction =
'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}';
/**
* Run a group command across a collection
*
* @method
* @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by.
* @param {object} condition An optional condition that must be true for a row to be considered.
* @param {object} initial Initial value of the aggregation counter object.
* @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated
* @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned.
* @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true.
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @return {Promise} returns Promise if no callback passed
* @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework.
*/
Collection.prototype.group = function(
keys,
condition,
initial,
reduce,
finalize,
command,
options,
callback
) {
var args = Array.prototype.slice.call(arguments, 3);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
reduce = args.length ? args.shift() : null;
finalize = args.length ? args.shift() : null;
command = args.length ? args.shift() : null;
options = args.length ? args.shift() || {} : {};
// Make sure we are backward compatible
if (!(typeof finalize === 'function')) {
command = finalize;
finalize = null;
}
if (
!Array.isArray(keys) &&
keys instanceof Object &&
typeof keys !== 'function' &&
!(keys._bsontype === 'Code')
) {
keys = Object.keys(keys);
}
if (typeof reduce === 'function') {
reduce = reduce.toString();
}
if (typeof finalize === 'function') {
finalize = finalize.toString();
}
// Set up the command as default
command = command == null ? true : command;
return executeOperation(this.s.topology, group, [
this,
keys,
condition,
initial,
reduce,
finalize,
command,
options,
callback
]);
};
var group = function(self, keys, condition, initial, reduce, finalize, command, options, callback) {
// Execute using the command
if (command) {
var reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce);
var selector = {
group: {
ns: self.s.name,
$reduce: reduceFunction,
cond: condition,
initial: initial,
out: 'inline'
}
};
// if finalize is defined
if (finalize != null) selector.group['finalize'] = finalize;
// Set up group selector
if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) {
selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys);
} else {
var hash = {};
keys.forEach(function(key) {
hash[key] = 1;
});
selector.group.key = hash;
}
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
// Do we have a readConcern specified
decorateWithReadConcern(selector, self, options);
// Have we specified collation
decorateWithCollation(selector, self, options);
// Execute command
self.s.db.command(selector, options, function(err, result) {
if (err) return handleCallback(callback, err, null);
handleCallback(callback, null, result.retval);
});
} else {
// Create execution scope
var scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {};
scope.ns = self.s.name;
scope.keys = keys;
scope.condition = condition;
scope.initial = initial;
// Pass in the function text to execute within mongodb.
var groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
self.s.db.eval(new Code(groupfn, scope), null, options, function(err, results) {
if (err) return handleCallback(callback, err, null);
handleCallback(callback, null, results.result || results);
});
}
};
define.classMethod('group', { callback: true, promise: true });
/**
* Functions that are passed as scope args must
* be converted to Code instances.
* @ignore
*/
function processScope(scope) {
if (!isObject(scope) || scope._bsontype === 'ObjectID') {
return scope;
}
var keys = Object.keys(scope);
var i = keys.length;
var key;
var new_scope = {};
while (i--) {
key = keys[i];
if ('function' === typeof scope[key]) {
new_scope[key] = new Code(String(scope[key]));
} else {
new_scope[key] = processScope(scope[key]);
}
}
return new_scope;
}
/**
* Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
*
* @method
* @param {(function|string)} map The mapping function.
* @param {(function|string)} reduce The reduce function.
* @param {object} [options=null] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {object} [options.out=null] Sets the output target for the map reduce job. *{inline:1} | {replace:'collectionName'} | {merge:'collectionName'} | {reduce:'collectionName'}*
* @param {object} [options.query=null] Query filter object.
* @param {object} [options.sort=null] Sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces.
* @param {number} [options.limit=null] Number of objects to return from collection.
* @param {boolean} [options.keeptemp=false] Keep temporary data.
* @param {(function|string)} [options.finalize=null] Finalize function.
* @param {object} [options.scope=null] Can pass in variables that can be access from map/reduce/finalize.
* @param {boolean} [options.jsMode=false] It is possible to make the execution stay in JS. Provided in MongoDB > 2.0.X.
* @param {boolean} [options.verbose=false] Provide statistics on job execution time.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Collection~resultCallback} [callback] The command result callback
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
*/
Collection.prototype.mapReduce = function(map, reduce, options, callback) {
if ('function' === typeof options) (callback = options), (options = {});
// Out must allways be defined (make sure we don't break weirdly on pre 1.8+ servers)
if (null == options.out) {
throw new Error(
'the out option parameter must be defined, see mongodb docs for possible values'
);
}
if ('function' === typeof map) {
map = map.toString();
}
if ('function' === typeof reduce) {
reduce = reduce.toString();
}
if ('function' === typeof options.finalize) {
options.finalize = options.finalize.toString();
}
return executeOperation(this.s.topology, mapReduce, [this, map, reduce, options, callback]);
};
var mapReduce = function(self, map, reduce, options, callback) {
var mapCommandHash = {
mapreduce: self.s.name,
map: map,
reduce: reduce
};
// Exclusion list
var exclusionList = ['readPreference', 'session'];
// Add any other options passed in
for (var n in options) {
if ('scope' === n) {
mapCommandHash[n] = processScope(options[n]);
} else {
// Only include if not in exclusion list
if (exclusionList.indexOf(n) === -1) {
mapCommandHash[n] = options[n];
}
}
}
options = shallowClone(options);
// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
// If we have a read preference and inline is not set as output fail hard
if (
options.readPreference !== false &&
options.readPreference !== 'primary' &&
options['out'] &&
(options['out'].inline !== 1 && options['out'] !== 'inline')
) {
// Force readPreference to primary
options.readPreference = 'primary';
// Decorate command with writeConcern if supported
decorateWithWriteConcern(mapCommandHash, self, options);
} else {
decorateWithReadConcern(mapCommandHash, self, options);
}
// Is bypassDocumentValidation specified
if (typeof options.bypassDocumentValidation === 'boolean') {
mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
}
// Have we specified collation
decorateWithCollation(mapCommandHash, self, options);
// Execute command
self.s.db.command(mapCommandHash, options, function(err, result) {
if (err) return handleCallback(callback, err);
// Check if we have an error
if (1 !== result.ok || result.err || result.errmsg) {
return handleCallback(callback, toError(result));
}
// Create statistics value
var stats = {};
if (result.timeMillis) stats['processtime'] = result.timeMillis;
if (result.counts) stats['counts'] = result.counts;
if (result.timing) stats['timing'] = result.timing;
// invoked with inline?
if (result.results) {
// If we wish for no verbosity
if (options['verbose'] == null || !options['verbose']) {
return handleCallback(callback, null, result.results);
}
return handleCallback(callback, null, { results: result.results, stats: stats });
}
// The returned collection
var collection = null;
// If we have an object it's a different db
if (result.result != null && typeof result.result === 'object') {
var doc = result.result;
// Return a collection from another db
var Db = require('./db');
collection = new Db(doc.db, self.s.db.s.topology, self.s.db.s.options).collection(
doc.collection
);
} else {
// Create a collection object that wraps the result collection
collection = self.s.db.collection(result.result);
}
// If we wish for no verbosity
if (options['verbose'] == null || !options['verbose']) {
return handleCallback(callback, err, collection);
}
// Return stats as third set of values
handleCallback(callback, err, { collection: collection, stats: stats });
});
};
define.classMethod('mapReduce', { callback: true, promise: true });
/**
* Initiate a Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order.
*
* @method
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {UnorderedBulkOperation}
*/
Collection.prototype.initializeUnorderedBulkOp = function(options) {
options = options || {};
options.promiseLibrary = this.s.promiseLibrary;
return unordered(this.s.topology, this, options);
};
define.classMethod('initializeUnorderedBulkOp', {
callback: false,
promise: false,
returns: [ordered.UnorderedBulkOperation]
});
/**
* Initiate an In order bulk write operation, operations will be serially executed in the order they are added, creating a new operation for each switch in types.
*
* @method
* @param {object} [options=null] Optional settings.
* @param {(number|string)} [options.w=null] The write concern.
* @param {number} [options.wtimeout=null] The write concern timeout.
* @param {boolean} [options.j=false] Specify a journal write concern.
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {OrderedBulkOperation} callback The command result callback
* @return {null}
*/
Collection.prototype.initializeOrderedBulkOp = function(options) {
options = options || {};
options.promiseLibrary = this.s.promiseLibrary;
return ordered(this.s.topology, this, options);
};
define.classMethod('initializeOrderedBulkOp', {
callback: false,
promise: false,
returns: [ordered.OrderedBulkOperation]
});
// Get write concern
var writeConcern = function(target, db, col, options) {
if (options.w != null || options.j != null || options.fsync != null) {
var opts = {};
if (options.w != null) opts.w = options.w;
if (options.wtimeout != null) opts.wtimeout = options.wtimeout;
if (options.j != null) opts.j = options.j;
if (options.fsync != null) opts.fsync = options.fsync;
target.writeConcern = opts;
} else if (
col.writeConcern.w != null ||
col.writeConcern.j != null ||
col.writeConcern.fsync != null
) {
target.writeConcern = col.writeConcern;
} else if (
db.writeConcern.w != null ||
db.writeConcern.j != null ||
db.writeConcern.fsync != null
) {
target.writeConcern = db.writeConcern;
}
// NOTE: there is probably a much better place for this
if (db.s.options.retryWrites) target.retryWrites = true;
return target;
};
// Figure out the read preference
var getReadPreference = function(self, options, db) {
let r = null;
if (options.readPreference) {
r = options.readPreference;
} else if (self.s.readPreference) {
r = self.s.readPreference;
} else if (db.s.readPreference) {
r = db.s.readPreference;
} else {
return options;
}
if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
});
}
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
}
return options;
};
module.exports = Collection;