|
|
'use strict';
|
|
|
|
|
|
var checkCollectionName = require('./utils').checkCollectionName,
|
|
|
ObjectID = require('mongodb-core').BSON.ObjectID,
|
|
|
Long = require('mongodb-core').BSON.Long,
|
|
|
Code = require('mongodb-core').BSON.Code,
|
|
|
f = require('util').format,
|
|
|
AggregationCursor = require('./aggregation_cursor'),
|
|
|
MongoError = require('mongodb-core').MongoError,
|
|
|
shallowClone = require('./utils').shallowClone,
|
|
|
isObject = require('./utils').isObject,
|
|
|
toError = require('./utils').toError,
|
|
|
normalizeHintField = require('./utils').normalizeHintField,
|
|
|
handleCallback = require('./utils').handleCallback,
|
|
|
decorateCommand = require('./utils').decorateCommand,
|
|
|
formattedOrderClause = require('./utils').formattedOrderClause,
|
|
|
ReadPreference = require('mongodb-core').ReadPreference,
|
|
|
CommandCursor = require('./command_cursor'),
|
|
|
Define = require('./metadata'),
|
|
|
Cursor = require('./cursor'),
|
|
|
unordered = require('./bulk/unordered'),
|
|
|
ordered = require('./bulk/ordered'),
|
|
|
ChangeStream = require('./change_stream'),
|
|
|
executeOperation = require('./utils').executeOperation;
|
|
|
|
|
|
/**
|
|
|
* @fileOverview The **Collection** class is an internal class that embodies a MongoDB collection
|
|
|
* allowing for insert/update/remove/find and other command operation on that MongoDB collection.
|
|
|
*
|
|
|
* **COLLECTION Cannot directly be instantiated**
|
|
|
* @example
|
|
|
* const MongoClient = require('mongodb').MongoClient;
|
|
|
* const test = require('assert');
|
|
|
* // Connection url
|
|
|
* const url = 'mongodb://localhost:27017';
|
|
|
* // Database Name
|
|
|
* const dbName = 'test';
|
|
|
* // Connect using MongoClient
|
|
|
* MongoClient.connect(url, function(err, client) {
|
|
|
* // Create a collection we want to drop later
|
|
|
* const col = client.db(dbName).collection('createIndexExample1');
|
|
|
* // Show that duplicate records got dropped
|
|
|
* col.find({}).toArray(function(err, items) {
|
|
|
* test.equal(null, err);
|
|
|
* test.equal(4, items.length);
|
|
|
* client.close();
|
|
|
* });
|
|
|
* });
|
|
|
*/
|
|
|
|
|
|
var mergeKeys = ['readPreference', 'ignoreUndefined'];
|
|
|
|
|
|
/**
|
|
|
* Create a new Collection instance (INTERNAL TYPE, do not instantiate directly)
|
|
|
* @class
|
|
|
* @property {string} collectionName Get the collection name.
|
|
|
* @property {string} namespace Get the full collection namespace.
|
|
|
* @property {object} writeConcern The current write concern values.
|
|
|
* @property {object} readConcern The current read concern values.
|
|
|
* @property {object} hint Get current index hint for collection.
|
|
|
* @return {Collection} a Collection instance.
|
|
|
*/
|
|
|
var Collection = function(db, topology, dbName, name, pkFactory, options) {
|
|
|
checkCollectionName(name);
|
|
|
|
|
|
// Unpack variables
|
|
|
var internalHint = null;
|
|
|
var slaveOk = options == null || options.slaveOk == null ? db.slaveOk : options.slaveOk;
|
|
|
var serializeFunctions =
|
|
|
options == null || options.serializeFunctions == null
|
|
|
? db.s.options.serializeFunctions
|
|
|
: options.serializeFunctions;
|
|
|
var raw = options == null || options.raw == null ? db.s.options.raw : options.raw;
|
|
|
var promoteLongs =
|
|
|
options == null || options.promoteLongs == null
|
|
|
? db.s.options.promoteLongs
|
|
|
: options.promoteLongs;
|
|
|
var promoteValues =
|
|
|
options == null || options.promoteValues == null
|
|
|
? db.s.options.promoteValues
|
|
|
: options.promoteValues;
|
|
|
var promoteBuffers =
|
|
|
options == null || options.promoteBuffers == null
|
|
|
? db.s.options.promoteBuffers
|
|
|
: options.promoteBuffers;
|
|
|
var readPreference = null;
|
|
|
var collectionHint = null;
|
|
|
var namespace = f('%s.%s', dbName, name);
|
|
|
|
|
|
// Get the promiseLibrary
|
|
|
var promiseLibrary = options.promiseLibrary || Promise;
|
|
|
|
|
|
// Assign the right collection level readPreference
|
|
|
if (options && options.readPreference) {
|
|
|
readPreference = options.readPreference;
|
|
|
} else if (db.options.readPreference) {
|
|
|
readPreference = db.options.readPreference;
|
|
|
}
|
|
|
|
|
|
// Set custom primary key factory if provided
|
|
|
pkFactory = pkFactory == null ? ObjectID : pkFactory;
|
|
|
|
|
|
// Internal state
|
|
|
this.s = {
|
|
|
// Set custom primary key factory if provided
|
|
|
pkFactory: pkFactory,
|
|
|
// Db
|
|
|
db: db,
|
|
|
// Topology
|
|
|
topology: topology,
|
|
|
// dbName
|
|
|
dbName: dbName,
|
|
|
// Options
|
|
|
options: options,
|
|
|
// Namespace
|
|
|
namespace: namespace,
|
|
|
// Read preference
|
|
|
readPreference: readPreference,
|
|
|
// SlaveOK
|
|
|
slaveOk: slaveOk,
|
|
|
// Serialize functions
|
|
|
serializeFunctions: serializeFunctions,
|
|
|
// Raw
|
|
|
raw: raw,
|
|
|
// promoteLongs
|
|
|
promoteLongs: promoteLongs,
|
|
|
// promoteValues
|
|
|
promoteValues: promoteValues,
|
|
|
// promoteBuffers
|
|
|
promoteBuffers: promoteBuffers,
|
|
|
// internalHint
|
|
|
internalHint: internalHint,
|
|
|
// collectionHint
|
|
|
collectionHint: collectionHint,
|
|
|
// Name
|
|
|
name: name,
|
|
|
// Promise library
|
|
|
promiseLibrary: promiseLibrary,
|
|
|
// Read Concern
|
|
|
readConcern: options.readConcern
|
|
|
};
|
|
|
};
|
|
|
|
|
|
var define = (Collection.define = new Define('Collection', Collection, false));
|
|
|
|
|
|
Object.defineProperty(Collection.prototype, 'dbName', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
return this.s.dbName;
|
|
|
}
|
|
|
});
|
|
|
|
|
|
Object.defineProperty(Collection.prototype, 'collectionName', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
return this.s.name;
|
|
|
}
|
|
|
});
|
|
|
|
|
|
Object.defineProperty(Collection.prototype, 'namespace', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
return this.s.namespace;
|
|
|
}
|
|
|
});
|
|
|
|
|
|
Object.defineProperty(Collection.prototype, 'readConcern', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
return this.s.readConcern || { level: 'local' };
|
|
|
}
|
|
|
});
|
|
|
|
|
|
Object.defineProperty(Collection.prototype, 'writeConcern', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
var ops = {};
|
|
|
if (this.s.options.w != null) ops.w = this.s.options.w;
|
|
|
if (this.s.options.j != null) ops.j = this.s.options.j;
|
|
|
if (this.s.options.fsync != null) ops.fsync = this.s.options.fsync;
|
|
|
if (this.s.options.wtimeout != null) ops.wtimeout = this.s.options.wtimeout;
|
|
|
return ops;
|
|
|
}
|
|
|
});
|
|
|
|
|
|
/**
|
|
|
* @ignore
|
|
|
*/
|
|
|
Object.defineProperty(Collection.prototype, 'hint', {
|
|
|
enumerable: true,
|
|
|
get: function() {
|
|
|
return this.s.collectionHint;
|
|
|
},
|
|
|
set: function(v) {
|
|
|
this.s.collectionHint = normalizeHintField(v);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
/**
|
|
|
* Creates a cursor for a query that can be used to iterate over results from MongoDB
|
|
|
* @method
|
|
|
* @param {object} [query={}] The cursor query object.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {number} [options.limit=0] Sets the limit of documents returned in the query.
|
|
|
* @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
|
|
|
* @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
|
|
|
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
|
|
|
* @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
|
|
|
* @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
|
|
|
* @param {boolean} [options.explain=false] Explain the query instead of returning the data.
|
|
|
* @param {boolean} [options.snapshot=false] Snapshot query.
|
|
|
* @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
|
|
|
* @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
|
|
|
* @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
|
|
|
* @param {boolean} [options.returnKey=false] Only return the index key.
|
|
|
* @param {number} [options.maxScan=null] Limit the number of items to scan.
|
|
|
* @param {number} [options.min=null] Set index bounds.
|
|
|
* @param {number} [options.max=null] Set index bounds.
|
|
|
* @param {boolean} [options.showDiskLoc=false] Show disk location of results.
|
|
|
* @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
|
|
|
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
|
|
|
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
|
|
|
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
|
|
|
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
|
|
|
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @throws {MongoError}
|
|
|
* @return {Cursor}
|
|
|
*/
|
|
|
Collection.prototype.find = function(query, options, callback) {
|
|
|
let selector = query;
|
|
|
// figuring out arguments
|
|
|
if (typeof callback !== 'function') {
|
|
|
if (typeof options === 'function') {
|
|
|
callback = options;
|
|
|
options = undefined;
|
|
|
} else if (options == null) {
|
|
|
callback = typeof selector === 'function' ? selector : undefined;
|
|
|
selector = typeof selector === 'object' ? selector : undefined;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Ensure selector is not null
|
|
|
selector = selector == null ? {} : selector;
|
|
|
// Validate correctness off the selector
|
|
|
var object = selector;
|
|
|
if (Buffer.isBuffer(object)) {
|
|
|
var object_size = object[0] | (object[1] << 8) | (object[2] << 16) | (object[3] << 24);
|
|
|
if (object_size !== object.length) {
|
|
|
var error = new Error(
|
|
|
'query selector raw message size does not match message header size [' +
|
|
|
object.length +
|
|
|
'] != [' +
|
|
|
object_size +
|
|
|
']'
|
|
|
);
|
|
|
error.name = 'MongoError';
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Check special case where we are using an objectId
|
|
|
if (selector != null && selector._bsontype === 'ObjectID') {
|
|
|
selector = { _id: selector };
|
|
|
}
|
|
|
|
|
|
if (!options) options = {};
|
|
|
|
|
|
let projection = options.projection || options.fields;
|
|
|
|
|
|
if (projection && !Buffer.isBuffer(projection) && Array.isArray(projection)) {
|
|
|
projection = projection.length
|
|
|
? projection.reduce((result, field) => {
|
|
|
result[field] = 1;
|
|
|
return result;
|
|
|
}, {})
|
|
|
: { _id: 1 };
|
|
|
}
|
|
|
|
|
|
var newOptions = {};
|
|
|
|
|
|
// Make a shallow copy of the collection options
|
|
|
for (var key in this.s.options) {
|
|
|
if (mergeKeys.indexOf(key) !== -1) {
|
|
|
newOptions[key] = this.s.options[key];
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Make a shallow copy of options
|
|
|
for (var optKey in options) {
|
|
|
newOptions[optKey] = options[optKey];
|
|
|
}
|
|
|
|
|
|
// Unpack options
|
|
|
newOptions.skip = options.skip ? options.skip : 0;
|
|
|
newOptions.limit = options.limit ? options.limit : 0;
|
|
|
newOptions.raw = typeof options.raw === 'boolean' ? options.raw : this.s.raw;
|
|
|
newOptions.hint = options.hint != null ? normalizeHintField(options.hint) : this.s.collectionHint;
|
|
|
newOptions.timeout = typeof options.timeout === 'undefined' ? undefined : options.timeout;
|
|
|
// // If we have overridden slaveOk otherwise use the default db setting
|
|
|
newOptions.slaveOk = options.slaveOk != null ? options.slaveOk : this.s.db.slaveOk;
|
|
|
|
|
|
// Add read preference if needed
|
|
|
newOptions = getReadPreference(this, newOptions, this.s.db);
|
|
|
|
|
|
// Set slave ok to true if read preference different from primary
|
|
|
if (
|
|
|
newOptions.readPreference != null &&
|
|
|
(newOptions.readPreference !== 'primary' || newOptions.readPreference.mode !== 'primary')
|
|
|
) {
|
|
|
newOptions.slaveOk = true;
|
|
|
}
|
|
|
|
|
|
// Ensure the query is an object
|
|
|
if (selector != null && typeof selector !== 'object') {
|
|
|
throw MongoError.create({ message: 'query selector must be an object', driver: true });
|
|
|
}
|
|
|
|
|
|
// Build the find command
|
|
|
var findCommand = {
|
|
|
find: this.s.namespace,
|
|
|
limit: newOptions.limit,
|
|
|
skip: newOptions.skip,
|
|
|
query: selector
|
|
|
};
|
|
|
|
|
|
// Ensure we use the right await data option
|
|
|
if (typeof newOptions.awaitdata === 'boolean') {
|
|
|
newOptions.awaitData = newOptions.awaitdata;
|
|
|
}
|
|
|
|
|
|
// Translate to new command option noCursorTimeout
|
|
|
if (typeof newOptions.timeout === 'boolean') newOptions.noCursorTimeout = newOptions.timeout;
|
|
|
|
|
|
// Merge in options to command
|
|
|
for (var name in newOptions) {
|
|
|
if (newOptions[name] != null && name !== 'session') {
|
|
|
findCommand[name] = newOptions[name];
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (projection) findCommand.fields = projection;
|
|
|
|
|
|
// Add db object to the new options
|
|
|
newOptions.db = this.s.db;
|
|
|
|
|
|
// Add the promise library
|
|
|
newOptions.promiseLibrary = this.s.promiseLibrary;
|
|
|
|
|
|
// Set raw if available at collection level
|
|
|
if (newOptions.raw == null && typeof this.s.raw === 'boolean') newOptions.raw = this.s.raw;
|
|
|
// Set promoteLongs if available at collection level
|
|
|
if (newOptions.promoteLongs == null && typeof this.s.promoteLongs === 'boolean')
|
|
|
newOptions.promoteLongs = this.s.promoteLongs;
|
|
|
if (newOptions.promoteValues == null && typeof this.s.promoteValues === 'boolean')
|
|
|
newOptions.promoteValues = this.s.promoteValues;
|
|
|
if (newOptions.promoteBuffers == null && typeof this.s.promoteBuffers === 'boolean')
|
|
|
newOptions.promoteBuffers = this.s.promoteBuffers;
|
|
|
|
|
|
// Sort options
|
|
|
if (findCommand.sort) {
|
|
|
findCommand.sort = formattedOrderClause(findCommand.sort);
|
|
|
}
|
|
|
|
|
|
// Set the readConcern
|
|
|
decorateWithReadConcern(findCommand, this, options);
|
|
|
|
|
|
// Decorate find command with collation options
|
|
|
decorateWithCollation(findCommand, this, options);
|
|
|
|
|
|
// Create the cursor
|
|
|
if (typeof callback === 'function')
|
|
|
return handleCallback(
|
|
|
callback,
|
|
|
null,
|
|
|
this.s.topology.cursor(this.s.namespace, findCommand, newOptions)
|
|
|
);
|
|
|
return this.s.topology.cursor(this.s.namespace, findCommand, newOptions);
|
|
|
};
|
|
|
|
|
|
define.classMethod('find', { callback: false, promise: false, returns: [Cursor] });
|
|
|
|
|
|
/**
|
|
|
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
|
|
|
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
|
|
|
* can be overridden by setting the **forceServerObjectId** flag.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} doc Document to insert.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
|
|
|
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~insertOneWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.insertOne = function(doc, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, insertOne, [this, doc, options, callback]);
|
|
|
};
|
|
|
|
|
|
var insertOne = function(self, doc, options, callback) {
|
|
|
if (Array.isArray(doc)) {
|
|
|
return callback(
|
|
|
MongoError.create({ message: 'doc parameter must be an object', driver: true })
|
|
|
);
|
|
|
}
|
|
|
|
|
|
insertDocuments(self, [doc], options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
// Workaround for pre 2.6 servers
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
// Add values to top level to ensure crud spec compatibility
|
|
|
r.insertedCount = r.result.n;
|
|
|
r.insertedId = doc._id;
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
var mapInserManyResults = function(docs, r) {
|
|
|
var finalResult = {
|
|
|
result: { ok: 1, n: r.insertedCount },
|
|
|
ops: docs,
|
|
|
insertedCount: r.insertedCount,
|
|
|
insertedIds: r.insertedIds
|
|
|
};
|
|
|
|
|
|
if (r.getLastOp()) {
|
|
|
finalResult.result.opTime = r.getLastOp();
|
|
|
}
|
|
|
|
|
|
return finalResult;
|
|
|
};
|
|
|
|
|
|
define.classMethod('insertOne', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Inserts an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
|
|
|
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
|
|
|
* can be overridden by setting the **forceServerObjectId** flag.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object[]} docs Documents to insert.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
|
|
|
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {boolean} [options.ordered=true] If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~insertWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.insertMany = function(docs, options, callback) {
|
|
|
var self = this;
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options ? shallowClone(options) : { ordered: true };
|
|
|
if (!Array.isArray(docs) && typeof callback === 'function') {
|
|
|
return callback(
|
|
|
MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
|
|
|
);
|
|
|
} else if (!Array.isArray(docs)) {
|
|
|
return new this.s.promiseLibrary(function(resolve, reject) {
|
|
|
reject(
|
|
|
MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
|
|
|
);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
// If keep going set unordered
|
|
|
options['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
|
|
|
|
|
|
// Set up the force server object id
|
|
|
var forceServerObjectId =
|
|
|
typeof options.forceServerObjectId === 'boolean'
|
|
|
? options.forceServerObjectId
|
|
|
: self.s.db.options.forceServerObjectId;
|
|
|
|
|
|
// Do we want to force the server to assign the _id key
|
|
|
if (forceServerObjectId !== true) {
|
|
|
// Add _id if not specified
|
|
|
for (var i = 0; i < docs.length; i++) {
|
|
|
if (docs[i]._id == null) docs[i]._id = self.s.pkFactory.createPk();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Generate the bulk write operations
|
|
|
var operations = [
|
|
|
{
|
|
|
insertMany: docs
|
|
|
}
|
|
|
];
|
|
|
|
|
|
return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback], {
|
|
|
resultMutator: result => mapInserManyResults(docs, result)
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('insertMany', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~BulkWriteOpResult
|
|
|
* @property {number} insertedCount Number of documents inserted.
|
|
|
* @property {number} matchedCount Number of documents matched for update.
|
|
|
* @property {number} modifiedCount Number of documents modified.
|
|
|
* @property {number} deletedCount Number of documents deleted.
|
|
|
* @property {number} upsertedCount Number of documents upserted.
|
|
|
* @property {object} insertedIds Inserted document generated Id's, hash key is the index of the originating operation
|
|
|
* @property {object} upsertedIds Upserted document generated Id's, hash key is the index of the originating operation
|
|
|
* @property {object} result The command result object.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~bulkWriteOpCallback
|
|
|
* @param {BulkWriteError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~BulkWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Perform a bulkWrite operation without a fluent API
|
|
|
*
|
|
|
* Legal operation types are
|
|
|
*
|
|
|
* { insertOne: { document: { a: 1 } } }
|
|
|
*
|
|
|
* { updateOne: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
|
|
|
*
|
|
|
* { updateMany: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
|
|
|
*
|
|
|
* { deleteOne: { filter: {c:1} } }
|
|
|
*
|
|
|
* { deleteMany: { filter: {c:1} } }
|
|
|
*
|
|
|
* { replaceOne: { filter: {c:3}, replacement: {c:4}, upsert:true}}
|
|
|
*
|
|
|
* If documents passed in do not contain the **_id** field,
|
|
|
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
|
|
|
* can be overridden by setting the **forceServerObjectId** flag.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object[]} operations Bulk operations to perform.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
|
|
|
* @param {boolean} [options.ordered=true] Execute write operation in ordered or unordered fashion.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~bulkWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.bulkWrite = function(operations, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || { ordered: true };
|
|
|
|
|
|
if (!Array.isArray(operations)) {
|
|
|
throw MongoError.create({ message: 'operations must be an array of documents', driver: true });
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback]);
|
|
|
};
|
|
|
|
|
|
var bulkWrite = function(self, operations, options, callback) {
|
|
|
// Add ignoreUndfined
|
|
|
if (self.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = self.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
// Create the bulk operation
|
|
|
var bulk =
|
|
|
options.ordered === true || options.ordered == null
|
|
|
? self.initializeOrderedBulkOp(options)
|
|
|
: self.initializeUnorderedBulkOp(options);
|
|
|
|
|
|
// Do we have a collation
|
|
|
var collation = false;
|
|
|
|
|
|
// for each op go through and add to the bulk
|
|
|
try {
|
|
|
for (var i = 0; i < operations.length; i++) {
|
|
|
// Get the operation type
|
|
|
var key = Object.keys(operations[i])[0];
|
|
|
// Check if we have a collation
|
|
|
if (operations[i][key].collation) {
|
|
|
collation = true;
|
|
|
}
|
|
|
|
|
|
// Pass to the raw bulk
|
|
|
bulk.raw(operations[i]);
|
|
|
}
|
|
|
} catch (err) {
|
|
|
return callback(err, null);
|
|
|
}
|
|
|
|
|
|
// Final options for write concern
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
var writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
|
|
|
var capabilities = self.s.topology.capabilities();
|
|
|
|
|
|
// Did the user pass in a collation, check if our write server supports it
|
|
|
if (collation && capabilities && !capabilities.commandsTakeCollation) {
|
|
|
return callback(new MongoError(f('server/primary/mongos does not support collation')));
|
|
|
}
|
|
|
|
|
|
// Execute the bulk
|
|
|
bulk.execute(writeCon, finalOptions, function(err, r) {
|
|
|
// We have connection level error
|
|
|
if (!r && err) {
|
|
|
return callback(err, null);
|
|
|
}
|
|
|
|
|
|
r.insertedCount = r.nInserted;
|
|
|
r.matchedCount = r.nMatched;
|
|
|
r.modifiedCount = r.nModified || 0;
|
|
|
r.deletedCount = r.nRemoved;
|
|
|
r.upsertedCount = r.getUpsertedIds().length;
|
|
|
r.upsertedIds = {};
|
|
|
r.insertedIds = {};
|
|
|
|
|
|
// Update the n
|
|
|
r.n = r.insertedCount;
|
|
|
|
|
|
// Inserted documents
|
|
|
var inserted = r.getInsertedIds();
|
|
|
// Map inserted ids
|
|
|
for (var i = 0; i < inserted.length; i++) {
|
|
|
r.insertedIds[inserted[i].index] = inserted[i]._id;
|
|
|
}
|
|
|
|
|
|
// Upserted documents
|
|
|
var upserted = r.getUpsertedIds();
|
|
|
// Map upserted ids
|
|
|
for (i = 0; i < upserted.length; i++) {
|
|
|
r.upsertedIds[upserted[i].index] = upserted[i]._id;
|
|
|
}
|
|
|
|
|
|
// Return the results
|
|
|
callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
var insertDocuments = function(self, docs, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
// Ensure we are operating on an array op docs
|
|
|
docs = Array.isArray(docs) ? docs : [docs];
|
|
|
|
|
|
// Get the write concern options
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
|
|
|
// If keep going set unordered
|
|
|
if (finalOptions.keepGoing === true) finalOptions.ordered = false;
|
|
|
finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
|
|
|
|
|
|
// Set up the force server object id
|
|
|
var forceServerObjectId =
|
|
|
typeof options.forceServerObjectId === 'boolean'
|
|
|
? options.forceServerObjectId
|
|
|
: self.s.db.options.forceServerObjectId;
|
|
|
|
|
|
// Add _id if not specified
|
|
|
if (forceServerObjectId !== true) {
|
|
|
for (var i = 0; i < docs.length; i++) {
|
|
|
if (docs[i]._id === void 0) docs[i]._id = self.s.pkFactory.createPk();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// File inserts
|
|
|
self.s.topology.insert(self.s.namespace, docs, finalOptions, function(err, result) {
|
|
|
if (callback == null) return;
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
if (result == null) return handleCallback(callback, null, null);
|
|
|
if (result.result.code) return handleCallback(callback, toError(result.result));
|
|
|
if (result.result.writeErrors)
|
|
|
return handleCallback(callback, toError(result.result.writeErrors[0]));
|
|
|
// Add docs to the list
|
|
|
result.ops = docs;
|
|
|
// Return the results
|
|
|
handleCallback(callback, null, result);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('bulkWrite', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~WriteOpResult
|
|
|
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
|
|
|
* @property {object} connection The connection object used for the operation.
|
|
|
* @property {object} result The command result object.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~writeOpCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~WriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~insertWriteOpResult
|
|
|
* @property {Number} insertedCount The total amount of documents inserted.
|
|
|
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
|
|
|
* @property {Object.<Number, ObjectId>} insertedIds Map of the index of the inserted document to the id of the inserted document.
|
|
|
* @property {object} connection The connection object used for the operation.
|
|
|
* @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
|
|
|
* @property {Number} result.ok Is 1 if the command executed correctly.
|
|
|
* @property {Number} result.n The total count of documents inserted.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~insertOneWriteOpResult
|
|
|
* @property {Number} insertedCount The total amount of documents inserted.
|
|
|
* @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
|
|
|
* @property {ObjectId} insertedId The driver generated ObjectId for the insert operation.
|
|
|
* @property {object} connection The connection object used for the operation.
|
|
|
* @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
|
|
|
* @property {Number} result.ok Is 1 if the command executed correctly.
|
|
|
* @property {Number} result.n The total count of documents inserted.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~insertWriteOpCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~insertWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~insertOneWriteOpCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~insertOneWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Inserts a single document or a an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
|
|
|
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
|
|
|
* can be overridden by setting the **forceServerObjectId** flag.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {(object|object[])} docs Documents to insert.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
|
|
|
* @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~insertWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated Use insertOne, insertMany or bulkWrite
|
|
|
*/
|
|
|
Collection.prototype.insert = function(docs, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || { ordered: false };
|
|
|
docs = !Array.isArray(docs) ? [docs] : docs;
|
|
|
|
|
|
if (options.keepGoing === true) {
|
|
|
options.ordered = false;
|
|
|
}
|
|
|
|
|
|
return this.insertMany(docs, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('insert', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~updateWriteOpResult
|
|
|
* @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
|
|
|
* @property {Number} result.ok Is 1 if the command executed correctly.
|
|
|
* @property {Number} result.n The total count of documents scanned.
|
|
|
* @property {Number} result.nModified The total count of documents modified.
|
|
|
* @property {Object} connection The connection object used for the operation.
|
|
|
* @property {Number} matchedCount The number of documents that matched the filter.
|
|
|
* @property {Number} modifiedCount The number of documents that were modified.
|
|
|
* @property {Number} upsertedCount The number of documents upserted.
|
|
|
* @property {Object} upsertedId The upserted id.
|
|
|
* @property {ObjectId} upsertedId._id The upserted _id returned from the server.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~updateWriteOpCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~updateWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Update a single document on MongoDB
|
|
|
* @method
|
|
|
* @param {object} filter The Filter used to select the document to update
|
|
|
* @param {object} update The update operations to be applied to the document
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.upsert=false] Update operation is an upsert.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.updateOne = function(filter, update, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
var err = checkForAtomicOperators(update);
|
|
|
if (err) {
|
|
|
if (typeof callback === 'function') return callback(err);
|
|
|
return this.s.promiseLibrary.reject(err);
|
|
|
}
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, updateOne, [this, filter, update, options, callback]);
|
|
|
};
|
|
|
|
|
|
var checkForAtomicOperators = function(update) {
|
|
|
var keys = Object.keys(update);
|
|
|
|
|
|
// same errors as the server would give for update doc lacking atomic operators
|
|
|
if (keys.length === 0) {
|
|
|
return toError('The update operation document must contain at least one atomic operator.');
|
|
|
}
|
|
|
|
|
|
if (keys[0][0] !== '$') {
|
|
|
return toError('the update operation document must contain atomic operators.');
|
|
|
}
|
|
|
};
|
|
|
|
|
|
var updateOne = function(self, filter, update, options, callback) {
|
|
|
// Set single document update
|
|
|
options.multi = false;
|
|
|
// Execute update
|
|
|
updateDocuments(self, filter, update, options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
|
|
|
r.upsertedId =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
|
|
|
? r.result.upserted[0]
|
|
|
: null;
|
|
|
r.upsertedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
|
|
|
r.matchedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('updateOne', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Replace a document on MongoDB
|
|
|
* @method
|
|
|
* @param {object} filter The Filter used to select the document to update
|
|
|
* @param {object} doc The Document that replaces the matching document
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.upsert=false] Update operation is an upsert.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.replaceOne = function(filter, doc, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, replaceOne, [this, filter, doc, options, callback]);
|
|
|
};
|
|
|
|
|
|
var replaceOne = function(self, filter, doc, options, callback) {
|
|
|
// Set single document update
|
|
|
options.multi = false;
|
|
|
|
|
|
// Execute update
|
|
|
updateDocuments(self, filter, doc, options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
|
|
|
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
|
|
|
r.upsertedId =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
|
|
|
? r.result.upserted[0]
|
|
|
: null;
|
|
|
r.upsertedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
|
|
|
r.matchedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
|
|
|
r.ops = [doc];
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('replaceOne', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Update multiple documents on MongoDB
|
|
|
* @method
|
|
|
* @param {object} filter The Filter used to select the documents to update
|
|
|
* @param {object} update The update operations to be applied to the document
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.upsert=false] Update operation is an upsert.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~updateWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.updateMany = function(filter, update, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
var err = checkForAtomicOperators(update);
|
|
|
if (err) {
|
|
|
if (typeof callback === 'function') return callback(err);
|
|
|
return this.s.promiseLibrary.reject(err);
|
|
|
}
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, updateMany, [this, filter, update, options, callback]);
|
|
|
};
|
|
|
|
|
|
var updateMany = function(self, filter, update, options, callback) {
|
|
|
// Set single document update
|
|
|
options.multi = true;
|
|
|
// Execute update
|
|
|
updateDocuments(self, filter, update, options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
|
|
|
r.upsertedId =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0
|
|
|
? r.result.upserted[0]
|
|
|
: null;
|
|
|
r.upsertedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
|
|
|
r.matchedCount =
|
|
|
Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('updateMany', { callback: true, promise: true });
|
|
|
|
|
|
var updateDocuments = function(self, selector, document, options, callback) {
|
|
|
if ('function' === typeof options) (callback = options), (options = null);
|
|
|
if (options == null) options = {};
|
|
|
if (!('function' === typeof callback)) callback = null;
|
|
|
|
|
|
// If we are not providing a selector or document throw
|
|
|
if (selector == null || typeof selector !== 'object')
|
|
|
return callback(toError('selector must be a valid JavaScript object'));
|
|
|
if (document == null || typeof document !== 'object')
|
|
|
return callback(toError('document must be a valid JavaScript object'));
|
|
|
|
|
|
// Get the write concern options
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
|
|
|
// Do we return the actual result document
|
|
|
// Either use override on the function, or go back to default on either the collection
|
|
|
// level or db
|
|
|
finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
|
|
|
|
|
|
// Execute the operation
|
|
|
var op = { q: selector, u: document };
|
|
|
op.upsert = options.upsert !== void 0 ? !!options.upsert : false;
|
|
|
op.multi = options.multi !== void 0 ? !!options.multi : false;
|
|
|
|
|
|
if (finalOptions.arrayFilters) {
|
|
|
op.arrayFilters = finalOptions.arrayFilters;
|
|
|
delete finalOptions.arrayFilters;
|
|
|
}
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(finalOptions, self, options);
|
|
|
|
|
|
// Update options
|
|
|
self.s.topology.update(self.s.namespace, [op], finalOptions, function(err, result) {
|
|
|
if (callback == null) return;
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
if (result == null) return handleCallback(callback, null, null);
|
|
|
if (result.result.code) return handleCallback(callback, toError(result.result));
|
|
|
if (result.result.writeErrors)
|
|
|
return handleCallback(callback, toError(result.result.writeErrors[0]));
|
|
|
// Return the results
|
|
|
handleCallback(callback, null, result);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
* Updates documents.
|
|
|
* @method
|
|
|
* @param {object} selector The selector for the update operation.
|
|
|
* @param {object} document The update document.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.upsert=false] Update operation is an upsert.
|
|
|
* @param {boolean} [options.multi=false] Update one/all documents with operation.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~writeOpCallback} [callback] The command result callback
|
|
|
* @throws {MongoError}
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated use updateOne, updateMany or bulkWrite
|
|
|
*/
|
|
|
Collection.prototype.update = function(selector, document, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, updateDocuments, [
|
|
|
this,
|
|
|
selector,
|
|
|
document,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
define.classMethod('update', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~deleteWriteOpResult
|
|
|
* @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
|
|
|
* @property {Number} result.ok Is 1 if the command executed correctly.
|
|
|
* @property {Number} result.n The total count of documents deleted.
|
|
|
* @property {Object} connection The connection object used for the operation.
|
|
|
* @property {Number} deletedCount The number of documents deleted.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~deleteWriteOpCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~deleteWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Delete a document on MongoDB
|
|
|
* @method
|
|
|
* @param {object} filter The Filter used to select the document to remove
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~deleteWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.deleteOne = function(filter, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, deleteOne, [this, filter, options, callback]);
|
|
|
};
|
|
|
|
|
|
var deleteOne = function(self, filter, options, callback) {
|
|
|
options.single = true;
|
|
|
removeDocuments(self, filter, options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
r.deletedCount = r.result.n;
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('deleteOne', { callback: true, promise: true });
|
|
|
|
|
|
Collection.prototype.removeOne = Collection.prototype.deleteOne;
|
|
|
|
|
|
define.classMethod('removeOne', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Delete multiple documents on MongoDB
|
|
|
* @method
|
|
|
* @param {object} filter The Filter used to select the documents to remove
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~deleteWriteOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.deleteMany = function(filter, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, deleteMany, [this, filter, options, callback]);
|
|
|
};
|
|
|
|
|
|
var deleteMany = function(self, filter, options, callback) {
|
|
|
options.single = false;
|
|
|
|
|
|
removeDocuments(self, filter, options, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (err && callback) return callback(err);
|
|
|
if (r == null) return callback(null, { result: { ok: 1 } });
|
|
|
r.deletedCount = r.result.n;
|
|
|
if (callback) callback(null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
var removeDocuments = function(self, selector, options, callback) {
|
|
|
if (typeof options === 'function') {
|
|
|
(callback = options), (options = {});
|
|
|
} else if (typeof selector === 'function') {
|
|
|
callback = selector;
|
|
|
options = {};
|
|
|
selector = {};
|
|
|
}
|
|
|
|
|
|
// Create an empty options object if the provided one is null
|
|
|
options = options || {};
|
|
|
|
|
|
// Get the write concern options
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
|
|
|
// If selector is null set empty
|
|
|
if (selector == null) selector = {};
|
|
|
|
|
|
// Build the op
|
|
|
var op = { q: selector, limit: 0 };
|
|
|
if (options.single) op.limit = 1;
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(finalOptions, self, options);
|
|
|
|
|
|
// Execute the remove
|
|
|
self.s.topology.remove(self.s.namespace, [op], finalOptions, function(err, result) {
|
|
|
if (callback == null) return;
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
if (result == null) return handleCallback(callback, null, null);
|
|
|
if (result.result.code) return handleCallback(callback, toError(result.result));
|
|
|
if (result.result.writeErrors)
|
|
|
return handleCallback(callback, toError(result.result.writeErrors[0]));
|
|
|
// Return the results
|
|
|
handleCallback(callback, null, result);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('deleteMany', { callback: true, promise: true });
|
|
|
|
|
|
Collection.prototype.removeMany = Collection.prototype.deleteMany;
|
|
|
|
|
|
define.classMethod('removeMany', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Remove documents.
|
|
|
* @method
|
|
|
* @param {object} selector The selector for the update operation.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.single=false] Removes the first document found.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~writeOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated use deleteOne, deleteMany or bulkWrite
|
|
|
*/
|
|
|
Collection.prototype.remove = function(selector, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, removeDocuments, [this, selector, options, callback]);
|
|
|
};
|
|
|
|
|
|
define.classMethod('remove', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Save a document. Simple full document replacement function. Not recommended for efficiency, use atomic
|
|
|
* operators and update instead for more efficient operations.
|
|
|
* @method
|
|
|
* @param {object} doc Document to save
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~writeOpCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated use insertOne, insertMany, updateOne or updateMany
|
|
|
*/
|
|
|
Collection.prototype.save = function(doc, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Add ignoreUndfined
|
|
|
if (this.s.options.ignoreUndefined) {
|
|
|
options = shallowClone(options);
|
|
|
options.ignoreUndefined = this.s.options.ignoreUndefined;
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, save, [this, doc, options, callback]);
|
|
|
};
|
|
|
|
|
|
var save = function(self, doc, options, callback) {
|
|
|
// Get the write concern options
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
// Establish if we need to perform an insert or update
|
|
|
if (doc._id != null) {
|
|
|
finalOptions.upsert = true;
|
|
|
return updateDocuments(self, { _id: doc._id }, doc, finalOptions, callback);
|
|
|
}
|
|
|
|
|
|
// Insert the document
|
|
|
insertDocuments(self, [doc], finalOptions, function(err, r) {
|
|
|
if (callback == null) return;
|
|
|
if (doc == null) return handleCallback(callback, null, null);
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
handleCallback(callback, null, r);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('save', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* The callback format for results
|
|
|
* @callback Collection~resultCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {object} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for an aggregation call
|
|
|
* @callback Collection~aggregationCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Fetches the first document that matches the query
|
|
|
* @method
|
|
|
* @param {object} query Query for find Operation
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {number} [options.limit=0] Sets the limit of documents returned in the query.
|
|
|
* @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
|
|
|
* @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
|
|
|
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
|
|
|
* @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
|
|
|
* @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
|
|
|
* @param {boolean} [options.explain=false] Explain the query instead of returning the data.
|
|
|
* @param {boolean} [options.snapshot=false] Snapshot query.
|
|
|
* @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
|
|
|
* @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
|
|
|
* @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
|
|
|
* @param {boolean} [options.returnKey=false] Only return the index key.
|
|
|
* @param {number} [options.maxScan=null] Limit the number of items to scan.
|
|
|
* @param {number} [options.min=null] Set index bounds.
|
|
|
* @param {number} [options.max=null] Set index bounds.
|
|
|
* @param {boolean} [options.showDiskLoc=false] Show disk location of results.
|
|
|
* @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
|
|
|
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
|
|
|
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
|
|
|
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
|
|
|
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
|
|
|
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.findOne = function(query, options, callback) {
|
|
|
if (typeof query === 'function') (callback = query), (query = {}), (options = {});
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
query = query || {};
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, findOne, [this, query, options, callback]);
|
|
|
};
|
|
|
|
|
|
var findOne = function(self, query, options, callback) {
|
|
|
const cursor = self
|
|
|
.find(query, options)
|
|
|
.limit(-1)
|
|
|
.batchSize(1);
|
|
|
|
|
|
// Return the item
|
|
|
cursor.next(function(err, item) {
|
|
|
if (err != null) return handleCallback(callback, toError(err), null);
|
|
|
handleCallback(callback, null, item);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('findOne', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* The callback format for the collection method, must be used if strict is specified
|
|
|
* @callback Collection~collectionResultCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection} collection The collection instance.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Rename the collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {string} newName New name of of the collection.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.dropTarget=false] Drop the target name collection if it previously exists.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~collectionResultCallback} [callback] The results callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.rename = function(newName, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
|
|
|
|
|
|
return executeOperation(this.s.topology, rename, [this, newName, options, callback]);
|
|
|
};
|
|
|
|
|
|
var rename = function(self, newName, options, callback) {
|
|
|
// Check the collection name
|
|
|
checkCollectionName(newName);
|
|
|
// Build the command
|
|
|
var renameCollection = f('%s.%s', self.s.dbName, self.s.name);
|
|
|
var toCollection = f('%s.%s', self.s.dbName, newName);
|
|
|
var dropTarget = typeof options.dropTarget === 'boolean' ? options.dropTarget : false;
|
|
|
var cmd = { renameCollection: renameCollection, to: toCollection, dropTarget: dropTarget };
|
|
|
|
|
|
// Decorate command with writeConcern if supported
|
|
|
decorateWithWriteConcern(cmd, self, options);
|
|
|
|
|
|
// Execute against admin
|
|
|
self.s.db.admin().command(cmd, options, function(err, doc) {
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
// We have an error
|
|
|
if (doc.errmsg) return handleCallback(callback, toError(doc), null);
|
|
|
try {
|
|
|
return handleCallback(
|
|
|
callback,
|
|
|
null,
|
|
|
new Collection(
|
|
|
self.s.db,
|
|
|
self.s.topology,
|
|
|
self.s.dbName,
|
|
|
newName,
|
|
|
self.s.pkFactory,
|
|
|
self.s.options
|
|
|
)
|
|
|
);
|
|
|
} catch (err) {
|
|
|
return handleCallback(callback, toError(err), null);
|
|
|
}
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('rename', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Drop the collection from the database, removing it permanently. New accesses will create a new collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The results callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.drop = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, this.s.db.dropCollection.bind(this.s.db), [
|
|
|
this.s.name,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
define.classMethod('drop', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Returns the options of the collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The results callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.options = function(opts, callback) {
|
|
|
if (typeof opts === 'function') (callback = opts), (opts = {});
|
|
|
opts = opts || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, options, [this, opts, callback]);
|
|
|
};
|
|
|
|
|
|
var options = function(self, opts, callback) {
|
|
|
self.s.db.listCollections({ name: self.s.name }, opts).toArray(function(err, collections) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
if (collections.length === 0) {
|
|
|
return handleCallback(
|
|
|
callback,
|
|
|
MongoError.create({ message: f('collection %s not found', self.s.namespace), driver: true })
|
|
|
);
|
|
|
}
|
|
|
|
|
|
handleCallback(callback, err, collections[0].options || null);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('options', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Returns if the collection is a capped collection
|
|
|
*
|
|
|
* @method
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The results callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.isCapped = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, isCapped, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var isCapped = function(self, options, callback) {
|
|
|
self.options(options, function(err, document) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
handleCallback(callback, null, document && document.capped);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('isCapped', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Creates an index on the db and collection collection.
|
|
|
* @method
|
|
|
* @param {(string|object)} fieldOrSpec Defines the index.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.unique=false] Creates an unique index.
|
|
|
* @param {boolean} [options.sparse=false] Creates a sparse index.
|
|
|
* @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
|
|
|
* @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
|
|
|
* @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
|
|
|
* @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
|
|
|
* @param {number} [options.v=null] Specify the format version of the indexes.
|
|
|
* @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
|
|
|
* @param {string} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
|
|
|
* @param {object} [options.partialFilterExpression=null] Creates a partial index based on the given filter object (MongoDB 3.2 or higher)
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.createIndex = function(fieldOrSpec, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, createIndex, [this, fieldOrSpec, options, callback]);
|
|
|
};
|
|
|
|
|
|
var createIndex = function(self, fieldOrSpec, options, callback) {
|
|
|
self.s.db.createIndex(self.s.name, fieldOrSpec, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('createIndex', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Creates multiple indexes in the collection, this method is only supported for
|
|
|
* MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported
|
|
|
* error. Index specifications are defined at http://docs.mongodb.org/manual/reference/command/createIndexes/.
|
|
|
* @method
|
|
|
* @param {array} indexSpecs An array of index specifications to be created
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.createIndexes = function(indexSpecs, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
|
|
|
options = options ? shallowClone(options) : {};
|
|
|
if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
|
|
|
|
|
|
return executeOperation(this.s.topology, createIndexes, [this, indexSpecs, options, callback]);
|
|
|
};
|
|
|
|
|
|
var createIndexes = function(self, indexSpecs, options, callback) {
|
|
|
var capabilities = self.s.topology.capabilities();
|
|
|
|
|
|
// Ensure we generate the correct name if the parameter is not set
|
|
|
for (var i = 0; i < indexSpecs.length; i++) {
|
|
|
if (indexSpecs[i].name == null) {
|
|
|
var keys = [];
|
|
|
|
|
|
// Did the user pass in a collation, check if our write server supports it
|
|
|
if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) {
|
|
|
return callback(new MongoError(f('server/primary/mongos does not support collation')));
|
|
|
}
|
|
|
|
|
|
for (var name in indexSpecs[i].key) {
|
|
|
keys.push(f('%s_%s', name, indexSpecs[i].key[name]));
|
|
|
}
|
|
|
|
|
|
// Set the name
|
|
|
indexSpecs[i].name = keys.join('_');
|
|
|
}
|
|
|
}
|
|
|
|
|
|
options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
|
|
|
|
|
|
// Execute the index
|
|
|
self.s.db.command(
|
|
|
{
|
|
|
createIndexes: self.s.name,
|
|
|
indexes: indexSpecs
|
|
|
},
|
|
|
options,
|
|
|
callback
|
|
|
);
|
|
|
};
|
|
|
|
|
|
define.classMethod('createIndexes', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Drops an index from this collection.
|
|
|
* @method
|
|
|
* @param {string} indexName Name of the index to drop.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.dropIndex = function(indexName, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 1);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
// Run only against primary
|
|
|
options.readPreference = ReadPreference.PRIMARY;
|
|
|
|
|
|
return executeOperation(this.s.topology, dropIndex, [this, indexName, options, callback]);
|
|
|
};
|
|
|
|
|
|
var dropIndex = function(self, indexName, options, callback) {
|
|
|
// Delete index command
|
|
|
var cmd = { dropIndexes: self.s.name, index: indexName };
|
|
|
|
|
|
// Decorate command with writeConcern if supported
|
|
|
decorateWithWriteConcern(cmd, self, options);
|
|
|
|
|
|
// Execute command
|
|
|
self.s.db.command(cmd, options, function(err, result) {
|
|
|
if (typeof callback !== 'function') return;
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
handleCallback(callback, null, result);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('dropIndex', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Drops all indexes from this collection.
|
|
|
* @method
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.dropIndexes = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options ? shallowClone(options) : {};
|
|
|
|
|
|
if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
|
|
|
|
|
|
return executeOperation(this.s.topology, dropIndexes, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var dropIndexes = function(self, options, callback) {
|
|
|
self.dropIndex('*', options, function(err) {
|
|
|
if (err) return handleCallback(callback, err, false);
|
|
|
handleCallback(callback, null, true);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('dropIndexes', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Drops all indexes from this collection.
|
|
|
* @method
|
|
|
* @deprecated use dropIndexes
|
|
|
* @param {Collection~resultCallback} callback The command result callback
|
|
|
* @return {Promise} returns Promise if no [callback] passed
|
|
|
*/
|
|
|
Collection.prototype.dropAllIndexes = Collection.prototype.dropIndexes;
|
|
|
|
|
|
define.classMethod('dropAllIndexes', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Reindex all indexes on the collection
|
|
|
* Warning: reIndex is a blocking operation (indexes are rebuilt in the foreground) and will be slow for large collections.
|
|
|
* @method
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.reIndex = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, reIndex, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var reIndex = function(self, options, callback) {
|
|
|
// Reindex
|
|
|
var cmd = { reIndex: self.s.name };
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(cmd, options, function(err, result) {
|
|
|
if (callback == null) return;
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
handleCallback(callback, null, result.ok ? true : false);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('reIndex', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Get the list of all indexes information for the collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {number} [options.batchSize=null] The batchSize for the returned command cursor or if pre 2.8 the systems batch collection
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @return {CommandCursor}
|
|
|
*/
|
|
|
Collection.prototype.listIndexes = function(options) {
|
|
|
options = options || {};
|
|
|
// Clone the options
|
|
|
options = shallowClone(options);
|
|
|
// Determine the read preference in the options.
|
|
|
options = getReadPreference(this, options, this.s.db, this);
|
|
|
// Set the CommandCursor constructor
|
|
|
options.cursorFactory = CommandCursor;
|
|
|
// Set the promiseLibrary
|
|
|
options.promiseLibrary = this.s.promiseLibrary;
|
|
|
|
|
|
if (!this.s.topology.capabilities()) {
|
|
|
throw new MongoError('cannot connect to server');
|
|
|
}
|
|
|
|
|
|
// We have a list collections command
|
|
|
if (this.s.topology.capabilities().hasListIndexesCommand) {
|
|
|
// Cursor options
|
|
|
var cursor = options.batchSize ? { batchSize: options.batchSize } : {};
|
|
|
// Build the command
|
|
|
var command = { listIndexes: this.s.name, cursor: cursor };
|
|
|
// Execute the cursor
|
|
|
cursor = this.s.topology.cursor(f('%s.$cmd', this.s.dbName), command, options);
|
|
|
// Do we have a readPreference, apply it
|
|
|
if (options.readPreference) cursor.setReadPreference(options.readPreference);
|
|
|
// Return the cursor
|
|
|
return cursor;
|
|
|
}
|
|
|
|
|
|
// Get the namespace
|
|
|
var ns = f('%s.system.indexes', this.s.dbName);
|
|
|
// Get the query
|
|
|
cursor = this.s.topology.cursor(ns, { find: ns, query: { ns: this.s.namespace } }, options);
|
|
|
// Do we have a readPreference, apply it
|
|
|
if (options.readPreference) cursor.setReadPreference(options.readPreference);
|
|
|
// Set the passed in batch size if one was provided
|
|
|
if (options.batchSize) cursor = cursor.batchSize(options.batchSize);
|
|
|
// Return the cursor
|
|
|
return cursor;
|
|
|
};
|
|
|
|
|
|
define.classMethod('listIndexes', { callback: false, promise: false, returns: [CommandCursor] });
|
|
|
|
|
|
/**
|
|
|
* Ensures that an index exists, if it does not it creates it
|
|
|
* @method
|
|
|
* @deprecated use createIndexes instead
|
|
|
* @param {(string|object)} fieldOrSpec Defines the index.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.unique=false] Creates an unique index.
|
|
|
* @param {boolean} [options.sparse=false] Creates a sparse index.
|
|
|
* @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
|
|
|
* @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
|
|
|
* @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
|
|
|
* @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
|
|
|
* @param {number} [options.v=null] Specify the format version of the indexes.
|
|
|
* @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
|
|
|
* @param {number} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.ensureIndex = function(fieldOrSpec, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, ensureIndex, [this, fieldOrSpec, options, callback]);
|
|
|
};
|
|
|
|
|
|
var ensureIndex = function(self, fieldOrSpec, options, callback) {
|
|
|
self.s.db.ensureIndex(self.s.name, fieldOrSpec, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('ensureIndex', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Checks if one or more indexes exist on the collection, fails on first non-existing index
|
|
|
* @method
|
|
|
* @param {(string|array)} indexes One or more index names to check.
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.indexExists = function(indexes, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, indexExists, [this, indexes, options, callback]);
|
|
|
};
|
|
|
|
|
|
var indexExists = function(self, indexes, options, callback) {
|
|
|
self.indexInformation(options, function(err, indexInformation) {
|
|
|
// If we have an error return
|
|
|
if (err != null) return handleCallback(callback, err, null);
|
|
|
// Let's check for the index names
|
|
|
if (!Array.isArray(indexes))
|
|
|
return handleCallback(callback, null, indexInformation[indexes] != null);
|
|
|
// Check in list of indexes
|
|
|
for (var i = 0; i < indexes.length; i++) {
|
|
|
if (indexInformation[indexes[i]] == null) {
|
|
|
return handleCallback(callback, null, false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// All keys found return true
|
|
|
return handleCallback(callback, null, true);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('indexExists', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Retrieves this collections index info.
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.full=false] Returns the full raw index information.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.indexInformation = function(options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 0);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, indexInformation, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var indexInformation = function(self, options, callback) {
|
|
|
self.s.db.indexInformation(self.s.name, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('indexInformation', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* The callback format for results
|
|
|
* @callback Collection~countCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {number} result The count of documents that matched the query.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Count number of matching documents in the db to a query.
|
|
|
* @method
|
|
|
* @param {object} query The query for the count.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {boolean} [options.limit=null] The limit of documents to count.
|
|
|
* @param {boolean} [options.skip=null] The number of documents to skip for the count.
|
|
|
* @param {string} [options.hint=null] An index name hint for the query.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~countCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.count = function(query, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 0);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
query = args.length ? args.shift() || {} : {};
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, count, [this, query, options, callback]);
|
|
|
};
|
|
|
|
|
|
var count = function(self, query, options, callback) {
|
|
|
var skip = options.skip;
|
|
|
var limit = options.limit;
|
|
|
var hint = options.hint;
|
|
|
var maxTimeMS = options.maxTimeMS;
|
|
|
|
|
|
// Final query
|
|
|
var cmd = {
|
|
|
count: self.s.name,
|
|
|
query: query
|
|
|
};
|
|
|
|
|
|
// Add limit, skip and maxTimeMS if defined
|
|
|
if (typeof skip === 'number') cmd.skip = skip;
|
|
|
if (typeof limit === 'number') cmd.limit = limit;
|
|
|
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
|
|
|
if (hint) cmd.hint = hint;
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db);
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
decorateWithReadConcern(cmd, self, options);
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(cmd, self, options);
|
|
|
|
|
|
// Execute command
|
|
|
self.s.db.command(cmd, options, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
handleCallback(callback, null, result.n);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('count', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* The distinct command returns returns a list of distinct values for the given key across a collection.
|
|
|
* @method
|
|
|
* @param {string} key Field of the document to find distinct values for.
|
|
|
* @param {object} query The query for filtering the set of documents to which we apply the distinct filter.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.distinct = function(key, query, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 1);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
var queryOption = args.length ? args.shift() || {} : {};
|
|
|
var optionsOption = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, distinct, [
|
|
|
this,
|
|
|
key,
|
|
|
queryOption,
|
|
|
optionsOption,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
var distinct = function(self, key, query, options, callback) {
|
|
|
// maxTimeMS option
|
|
|
var maxTimeMS = options.maxTimeMS;
|
|
|
|
|
|
// Distinct command
|
|
|
var cmd = {
|
|
|
distinct: self.s.name,
|
|
|
key: key,
|
|
|
query: query
|
|
|
};
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db, self);
|
|
|
|
|
|
// Add maxTimeMS if defined
|
|
|
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
decorateWithReadConcern(cmd, self, options);
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(cmd, self, options);
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(cmd, options, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
handleCallback(callback, null, result.values);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('distinct', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Retrieve all the indexes on the collection.
|
|
|
* @method
|
|
|
* @param {Object} [options] Optional settings
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.indexes = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
return executeOperation(this.s.topology, indexes, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var indexes = function(self, options, callback) {
|
|
|
options = Object.assign({}, { full: true }, options);
|
|
|
self.s.db.indexInformation(self.s.name, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('indexes', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Get all the collection statistics.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {number} [options.scale=null] Divide the returned sizes by scale value.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The collection result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.stats = function(options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 0);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, stats, [this, options, callback]);
|
|
|
};
|
|
|
|
|
|
var stats = function(self, options, callback) {
|
|
|
// Build command object
|
|
|
var commandObject = {
|
|
|
collStats: self.s.name
|
|
|
};
|
|
|
|
|
|
// Check if we have the scale value
|
|
|
if (options['scale'] != null) commandObject['scale'] = options['scale'];
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db, self);
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(commandObject, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('stats', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* @typedef {Object} Collection~findAndModifyWriteOpResult
|
|
|
* @property {object} value Document returned from findAndModify command.
|
|
|
* @property {object} lastErrorObject The raw lastErrorObject returned from the command.
|
|
|
* @property {Number} ok Is 1 if the command executed correctly.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* The callback format for inserts
|
|
|
* @callback Collection~findAndModifyCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Collection~findAndModifyWriteOpResult} result The result object if the command was executed successfully.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Find a document and delete it in one atomic operation, requires a write lock for the duration of the operation.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} filter Document selection filter.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
|
|
|
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
|
|
|
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.findOneAndDelete = function(filter, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Basic validation
|
|
|
if (filter == null || typeof filter !== 'object')
|
|
|
throw toError('filter parameter must be an object');
|
|
|
|
|
|
return executeOperation(this.s.topology, findOneAndDelete, [this, filter, options, callback]);
|
|
|
};
|
|
|
|
|
|
var findOneAndDelete = function(self, filter, options, callback) {
|
|
|
// Final options
|
|
|
var finalOptions = shallowClone(options);
|
|
|
finalOptions['fields'] = options.projection;
|
|
|
finalOptions['remove'] = true;
|
|
|
// Execute find and Modify
|
|
|
self.findAndModify(filter, options.sort, null, finalOptions, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('findOneAndDelete', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Find a document and replace it in one atomic operation, requires a write lock for the duration of the operation.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} filter Document selection filter.
|
|
|
* @param {object} replacement Document replacing the matching document.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
|
|
|
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
|
|
|
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
|
|
|
* @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
|
|
|
* @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.findOneAndReplace = function(filter, replacement, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Basic validation
|
|
|
if (filter == null || typeof filter !== 'object')
|
|
|
throw toError('filter parameter must be an object');
|
|
|
if (replacement == null || typeof replacement !== 'object')
|
|
|
throw toError('replacement parameter must be an object');
|
|
|
|
|
|
return executeOperation(this.s.topology, findOneAndReplace, [
|
|
|
this,
|
|
|
filter,
|
|
|
replacement,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
var findOneAndReplace = function(self, filter, replacement, options, callback) {
|
|
|
// Final options
|
|
|
var finalOptions = shallowClone(options);
|
|
|
finalOptions['fields'] = options.projection;
|
|
|
finalOptions['update'] = true;
|
|
|
finalOptions['new'] = options.returnOriginal !== void 0 ? !options.returnOriginal : false;
|
|
|
finalOptions['upsert'] = options.upsert !== void 0 ? !!options.upsert : false;
|
|
|
|
|
|
// Execute findAndModify
|
|
|
self.findAndModify(filter, options.sort, replacement, finalOptions, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('findOneAndReplace', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Find a document and update it in one atomic operation, requires a write lock for the duration of the operation.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} filter Document selection filter.
|
|
|
* @param {object} update Update operations to be performed on the document
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {object} [options.projection=null] Limits the fields to return for all matching documents.
|
|
|
* @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
|
|
|
* @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
|
|
|
* @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
|
|
|
* @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~findAndModifyCallback} [callback] The collection result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.findOneAndUpdate = function(filter, update, options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = {});
|
|
|
options = options || {};
|
|
|
|
|
|
// Basic validation
|
|
|
if (filter == null || typeof filter !== 'object')
|
|
|
throw toError('filter parameter must be an object');
|
|
|
if (update == null || typeof update !== 'object')
|
|
|
throw toError('update parameter must be an object');
|
|
|
|
|
|
return executeOperation(this.s.topology, findOneAndUpdate, [
|
|
|
this,
|
|
|
filter,
|
|
|
update,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
var findOneAndUpdate = function(self, filter, update, options, callback) {
|
|
|
// Final options
|
|
|
var finalOptions = shallowClone(options);
|
|
|
finalOptions['fields'] = options.projection;
|
|
|
finalOptions['update'] = true;
|
|
|
finalOptions['new'] =
|
|
|
typeof options.returnOriginal === 'boolean' ? !options.returnOriginal : false;
|
|
|
finalOptions['upsert'] = typeof options.upsert === 'boolean' ? options.upsert : false;
|
|
|
|
|
|
// Execute findAndModify
|
|
|
self.findAndModify(filter, options.sort, update, finalOptions, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('findOneAndUpdate', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Find and update a document.
|
|
|
* @method
|
|
|
* @param {object} query Query object to locate the object to modify.
|
|
|
* @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
|
|
|
* @param {object} doc The fields/vals to be updated.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {boolean} [options.remove=false] Set to true to remove the object before returning.
|
|
|
* @param {boolean} [options.upsert=false] Perform an upsert operation.
|
|
|
* @param {boolean} [options.new=false] Set to true if you want to return the modified object rather than the original. Ignored for remove.
|
|
|
* @param {object} [options.projection=null] Object containing the field projection for the result returned from the operation.
|
|
|
* @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~findAndModifyCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated use findOneAndUpdate, findOneAndReplace or findOneAndDelete instead
|
|
|
*/
|
|
|
Collection.prototype.findAndModify = function(query, sort, doc, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 1);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
sort = args.length ? args.shift() || [] : [];
|
|
|
doc = args.length ? args.shift() : null;
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
// Clone options
|
|
|
options = shallowClone(options);
|
|
|
// Force read preference primary
|
|
|
options.readPreference = ReadPreference.PRIMARY;
|
|
|
|
|
|
return executeOperation(this.s.topology, findAndModify, [
|
|
|
this,
|
|
|
query,
|
|
|
sort,
|
|
|
doc,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
var findAndModify = function(self, query, sort, doc, options, callback) {
|
|
|
// Create findAndModify command object
|
|
|
var queryObject = {
|
|
|
findandmodify: self.s.name,
|
|
|
query: query
|
|
|
};
|
|
|
|
|
|
sort = formattedOrderClause(sort);
|
|
|
if (sort) {
|
|
|
queryObject.sort = sort;
|
|
|
}
|
|
|
|
|
|
queryObject.new = options.new ? true : false;
|
|
|
queryObject.remove = options.remove ? true : false;
|
|
|
queryObject.upsert = options.upsert ? true : false;
|
|
|
|
|
|
const projection = options.projection || options.fields;
|
|
|
|
|
|
if (projection) {
|
|
|
queryObject.fields = projection;
|
|
|
}
|
|
|
|
|
|
if (options.arrayFilters) {
|
|
|
queryObject.arrayFilters = options.arrayFilters;
|
|
|
delete options.arrayFilters;
|
|
|
}
|
|
|
|
|
|
if (doc && !options.remove) {
|
|
|
queryObject.update = doc;
|
|
|
}
|
|
|
|
|
|
if (options.maxTimeMS) queryObject.maxTimeMS = options.maxTimeMS;
|
|
|
|
|
|
// Either use override on the function, or go back to default on either the collection
|
|
|
// level or db
|
|
|
if (options['serializeFunctions'] != null) {
|
|
|
options['serializeFunctions'] = options['serializeFunctions'];
|
|
|
} else {
|
|
|
options['serializeFunctions'] = self.s.serializeFunctions;
|
|
|
}
|
|
|
|
|
|
// No check on the documents
|
|
|
options.checkKeys = false;
|
|
|
|
|
|
// Get the write concern settings
|
|
|
var finalOptions = writeConcern(options, self.s.db, self, options);
|
|
|
|
|
|
// Decorate the findAndModify command with the write Concern
|
|
|
if (finalOptions.writeConcern) {
|
|
|
queryObject.writeConcern = finalOptions.writeConcern;
|
|
|
}
|
|
|
|
|
|
// Have we specified bypassDocumentValidation
|
|
|
if (typeof finalOptions.bypassDocumentValidation === 'boolean') {
|
|
|
queryObject.bypassDocumentValidation = finalOptions.bypassDocumentValidation;
|
|
|
}
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(queryObject, self, finalOptions);
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(queryObject, finalOptions, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
return handleCallback(callback, null, result);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('findAndModify', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Find and remove a document.
|
|
|
* @method
|
|
|
* @param {object} query Query object to locate the object to modify.
|
|
|
* @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated use findOneAndDelete instead
|
|
|
*/
|
|
|
Collection.prototype.findAndRemove = function(query, sort, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 1);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
sort = args.length ? args.shift() || [] : [];
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, findAndRemove, [this, query, sort, options, callback]);
|
|
|
};
|
|
|
|
|
|
var findAndRemove = function(self, query, sort, options, callback) {
|
|
|
// Add the remove option
|
|
|
options['remove'] = true;
|
|
|
// Execute the callback
|
|
|
self.findAndModify(query, sort, null, options, callback);
|
|
|
};
|
|
|
|
|
|
define.classMethod('findAndRemove', { callback: true, promise: true });
|
|
|
|
|
|
function decorateWithWriteConcern(command, self, options) {
|
|
|
// Do we support collation 3.4 and higher
|
|
|
var capabilities = self.s.topology.capabilities();
|
|
|
// Do we support write concerns 3.4 and higher
|
|
|
if (capabilities && capabilities.commandsTakeWriteConcern) {
|
|
|
// Get the write concern settings
|
|
|
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
|
|
|
// Add the write concern to the command
|
|
|
if (finalOptions.writeConcern) {
|
|
|
command.writeConcern = finalOptions.writeConcern;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
function decorateWithCollation(command, self, options) {
|
|
|
// Do we support collation 3.4 and higher
|
|
|
var capabilities = self.s.topology.capabilities();
|
|
|
// Do we support write concerns 3.4 and higher
|
|
|
if (capabilities && capabilities.commandsTakeCollation) {
|
|
|
if (options.collation && typeof options.collation === 'object') {
|
|
|
command.collation = options.collation;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
function decorateWithReadConcern(command, self, options) {
|
|
|
let readConcern = Object.assign({}, command.readConcern || {});
|
|
|
if (self.s.readConcern) {
|
|
|
Object.assign(readConcern, self.s.readConcern);
|
|
|
}
|
|
|
|
|
|
if (
|
|
|
options.session &&
|
|
|
options.session.supports.causalConsistency &&
|
|
|
options.session.operationTime
|
|
|
) {
|
|
|
Object.assign(readConcern, { afterClusterTime: options.session.operationTime });
|
|
|
}
|
|
|
|
|
|
if (Object.keys(readConcern).length > 0) {
|
|
|
Object.assign(command, { readConcern: readConcern });
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Execute an aggregation framework pipeline against the collection, needs MongoDB >= 2.2
|
|
|
* @method
|
|
|
* @param {object} pipeline Array containing all the aggregation framework commands for the execution.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {object} [options.cursor=null] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor.
|
|
|
* @param {number} [options.cursor.batchSize=null] The batchSize for the cursor
|
|
|
* @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >).
|
|
|
* @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >).
|
|
|
* @param {number} [options.maxTimeMS=null] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
|
|
|
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
|
|
|
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
|
|
|
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
|
|
|
* @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
|
|
|
* @param {string} [options.comment] Add a comment to an aggregation command
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~aggregationCallback} callback The command result callback
|
|
|
* @return {(null|AggregationCursor)}
|
|
|
*/
|
|
|
Collection.prototype.aggregate = function(pipeline, options, callback) {
|
|
|
var self = this;
|
|
|
|
|
|
if (Array.isArray(pipeline)) {
|
|
|
// Set up callback if one is provided
|
|
|
if (typeof options === 'function') {
|
|
|
callback = options;
|
|
|
options = {};
|
|
|
}
|
|
|
|
|
|
// If we have no options or callback we are doing
|
|
|
// a cursor based aggregation
|
|
|
if (options == null && callback == null) {
|
|
|
options = {};
|
|
|
}
|
|
|
} else {
|
|
|
// Aggregation pipeline passed as arguments on the method
|
|
|
var args = Array.prototype.slice.call(arguments, 0);
|
|
|
// Get the callback
|
|
|
callback = args.pop();
|
|
|
// Get the possible options object
|
|
|
var opts = args[args.length - 1];
|
|
|
// If it contains any of the admissible options pop it of the args
|
|
|
options =
|
|
|
opts &&
|
|
|
(opts.readPreference ||
|
|
|
opts.explain ||
|
|
|
opts.cursor ||
|
|
|
opts.out ||
|
|
|
opts.maxTimeMS ||
|
|
|
opts.hint ||
|
|
|
opts.allowDiskUse)
|
|
|
? args.pop()
|
|
|
: {};
|
|
|
// Left over arguments is the pipeline
|
|
|
pipeline = args;
|
|
|
}
|
|
|
|
|
|
// Ignore readConcern option
|
|
|
var ignoreReadConcern = false;
|
|
|
|
|
|
// Build the command
|
|
|
var command = { aggregate: this.s.name, pipeline: pipeline };
|
|
|
|
|
|
// If out was specified
|
|
|
if (typeof options.out === 'string') {
|
|
|
pipeline.push({ $out: options.out });
|
|
|
// Ignore read concern
|
|
|
ignoreReadConcern = true;
|
|
|
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
|
|
|
ignoreReadConcern = true;
|
|
|
}
|
|
|
|
|
|
// Decorate command with writeConcern if out has been specified
|
|
|
if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
|
|
|
decorateWithWriteConcern(command, self, options);
|
|
|
}
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(command, self, options);
|
|
|
|
|
|
// If we have bypassDocumentValidation set
|
|
|
if (typeof options.bypassDocumentValidation === 'boolean') {
|
|
|
command.bypassDocumentValidation = options.bypassDocumentValidation;
|
|
|
}
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
if (!ignoreReadConcern) {
|
|
|
decorateWithReadConcern(command, self, options);
|
|
|
}
|
|
|
|
|
|
// If we have allowDiskUse defined
|
|
|
if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
|
|
|
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
|
|
|
|
|
|
// If we are giving a hint
|
|
|
if (options.hint) command.hint = options.hint;
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(this, options, this.s.db, this);
|
|
|
|
|
|
// If explain has been specified add it
|
|
|
if (options.explain) {
|
|
|
if (command.readConcern || command.writeConcern) {
|
|
|
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
|
|
|
}
|
|
|
command.explain = options.explain;
|
|
|
}
|
|
|
|
|
|
if (typeof options.comment === 'string') command.comment = options.comment;
|
|
|
|
|
|
// Validate that cursor options is valid
|
|
|
if (options.cursor != null && typeof options.cursor !== 'object') {
|
|
|
throw toError('cursor options must be an object');
|
|
|
}
|
|
|
|
|
|
options.cursor = options.cursor || { batchSize: 1000 };
|
|
|
command.cursor = options.cursor;
|
|
|
|
|
|
// promiseLibrary
|
|
|
options.promiseLibrary = this.s.promiseLibrary;
|
|
|
|
|
|
// Set the AggregationCursor constructor
|
|
|
options.cursorFactory = AggregationCursor;
|
|
|
if (typeof callback !== 'function') {
|
|
|
if (!this.s.topology.capabilities()) {
|
|
|
throw new MongoError('cannot connect to server');
|
|
|
}
|
|
|
|
|
|
// Allow disk usage command
|
|
|
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
|
|
|
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
|
|
|
|
|
|
// Execute the cursor
|
|
|
return this.s.topology.cursor(this.s.namespace, command, options);
|
|
|
}
|
|
|
|
|
|
return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options));
|
|
|
};
|
|
|
|
|
|
define.classMethod('aggregate', { callback: true, promise: false });
|
|
|
|
|
|
/**
|
|
|
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
|
|
|
* @method
|
|
|
* @since 3.0.0
|
|
|
* @param {Array} [pipeline=null] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
|
|
|
* @param {object} [options=null] Optional settings
|
|
|
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
|
|
|
* @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
|
|
|
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
|
|
|
* @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
|
|
|
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
|
|
|
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @return {ChangeStream} a ChangeStream instance.
|
|
|
*/
|
|
|
Collection.prototype.watch = function(pipeline, options) {
|
|
|
pipeline = pipeline || [];
|
|
|
options = options || {};
|
|
|
|
|
|
// Allow optionally not specifying a pipeline
|
|
|
if (!Array.isArray(pipeline)) {
|
|
|
options = pipeline;
|
|
|
pipeline = [];
|
|
|
}
|
|
|
|
|
|
return new ChangeStream(this, pipeline, options);
|
|
|
};
|
|
|
|
|
|
define.classMethod('watch', { callback: false, promise: false });
|
|
|
|
|
|
/**
|
|
|
* The callback format for results
|
|
|
* @callback Collection~parallelCollectionScanCallback
|
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
|
* @param {Cursor[]} cursors A list of cursors returned allowing for parallel reading of collection.
|
|
|
*/
|
|
|
|
|
|
/**
|
|
|
* Return N number of parallel cursors for a collection allowing parallel reading of entire collection. There are
|
|
|
* no ordering guarantees for returned results.
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {number} [options.batchSize=null] Set the batchSize for the getMoreCommand when iterating over the query results.
|
|
|
* @param {number} [options.numCursors=1] The maximum number of parallel command cursors to return (the number of returned cursors will be in the range 1:numCursors)
|
|
|
* @param {boolean} [options.raw=false] Return all BSON documents as Raw Buffer documents.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~parallelCollectionScanCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.parallelCollectionScan = function(options, callback) {
|
|
|
if (typeof options === 'function') (callback = options), (options = { numCursors: 1 });
|
|
|
// Set number of cursors to 1
|
|
|
options.numCursors = options.numCursors || 1;
|
|
|
options.batchSize = options.batchSize || 1000;
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(this, options, this.s.db, this);
|
|
|
|
|
|
// Add a promiseLibrary
|
|
|
options.promiseLibrary = this.s.promiseLibrary;
|
|
|
|
|
|
return executeOperation(this.s.topology, parallelCollectionScan, [this, options, callback], {
|
|
|
returnsCursor: true
|
|
|
});
|
|
|
};
|
|
|
|
|
|
var parallelCollectionScan = function(self, options, callback) {
|
|
|
// Create command object
|
|
|
var commandObject = {
|
|
|
parallelCollectionScan: self.s.name,
|
|
|
numCursors: options.numCursors
|
|
|
};
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
decorateWithReadConcern(commandObject, self, options);
|
|
|
|
|
|
// Store the raw value
|
|
|
var raw = options.raw;
|
|
|
delete options['raw'];
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(commandObject, options, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
if (result == null)
|
|
|
return handleCallback(
|
|
|
callback,
|
|
|
new Error('no result returned for parallelCollectionScan'),
|
|
|
null
|
|
|
);
|
|
|
|
|
|
var cursors = [];
|
|
|
// Add the raw back to the option
|
|
|
if (raw) options.raw = raw;
|
|
|
// Create command cursors for each item
|
|
|
for (var i = 0; i < result.cursors.length; i++) {
|
|
|
var rawId = result.cursors[i].cursor.id;
|
|
|
// Convert cursorId to Long if needed
|
|
|
var cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId;
|
|
|
// Add a command cursor
|
|
|
cursors.push(self.s.topology.cursor(self.s.namespace, cursorId, options));
|
|
|
}
|
|
|
|
|
|
handleCallback(callback, null, cursors);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('parallelCollectionScan', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Execute a geo search using a geo haystack index on a collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {number} x Point to search on the x axis, ensure the indexes are ordered in the same order.
|
|
|
* @param {number} y Point to search on the y axis, ensure the indexes are ordered in the same order.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {number} [options.maxDistance=null] Include results up to maxDistance from the point.
|
|
|
* @param {object} [options.search=null] Filter the results by a query.
|
|
|
* @param {number} [options.limit=false] Max number of results to return.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.geoHaystackSearch = function(x, y, options, callback) {
|
|
|
var args = Array.prototype.slice.call(arguments, 2);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
return executeOperation(this.s.topology, geoHaystackSearch, [this, x, y, options, callback]);
|
|
|
};
|
|
|
|
|
|
var geoHaystackSearch = function(self, x, y, options, callback) {
|
|
|
// Build command object
|
|
|
var commandObject = {
|
|
|
geoSearch: self.s.name,
|
|
|
near: [x, y]
|
|
|
};
|
|
|
|
|
|
// Remove read preference from hash if it exists
|
|
|
commandObject = decorateCommand(commandObject, options, { readPreference: true, session: true });
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db, self);
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
decorateWithReadConcern(commandObject, self, options);
|
|
|
|
|
|
// Execute the command
|
|
|
self.s.db.command(commandObject, options, function(err, res) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
if (res.err || res.errmsg) handleCallback(callback, toError(res));
|
|
|
// should we only be returning res.results here? Not sure if the user
|
|
|
// should see the other return information
|
|
|
handleCallback(callback, null, res);
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('geoHaystackSearch', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Group function helper
|
|
|
* @ignore
|
|
|
*/
|
|
|
// var groupFunction = function () {
|
|
|
// var c = db[ns].find(condition);
|
|
|
// var map = new Map();
|
|
|
// var reduce_function = reduce;
|
|
|
//
|
|
|
// while (c.hasNext()) {
|
|
|
// var obj = c.next();
|
|
|
// var key = {};
|
|
|
//
|
|
|
// for (var i = 0, len = keys.length; i < len; ++i) {
|
|
|
// var k = keys[i];
|
|
|
// key[k] = obj[k];
|
|
|
// }
|
|
|
//
|
|
|
// var aggObj = map.get(key);
|
|
|
//
|
|
|
// if (aggObj == null) {
|
|
|
// var newObj = Object.extend({}, key);
|
|
|
// aggObj = Object.extend(newObj, initial);
|
|
|
// map.put(key, aggObj);
|
|
|
// }
|
|
|
//
|
|
|
// reduce_function(obj, aggObj);
|
|
|
// }
|
|
|
//
|
|
|
// return { "result": map.values() };
|
|
|
// }.toString();
|
|
|
var groupFunction =
|
|
|
'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}';
|
|
|
|
|
|
/**
|
|
|
* Run a group command across a collection
|
|
|
*
|
|
|
* @method
|
|
|
* @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by.
|
|
|
* @param {object} condition An optional condition that must be true for a row to be considered.
|
|
|
* @param {object} initial Initial value of the aggregation counter object.
|
|
|
* @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated
|
|
|
* @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned.
|
|
|
* @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
* @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework.
|
|
|
*/
|
|
|
Collection.prototype.group = function(
|
|
|
keys,
|
|
|
condition,
|
|
|
initial,
|
|
|
reduce,
|
|
|
finalize,
|
|
|
command,
|
|
|
options,
|
|
|
callback
|
|
|
) {
|
|
|
var args = Array.prototype.slice.call(arguments, 3);
|
|
|
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
|
|
|
reduce = args.length ? args.shift() : null;
|
|
|
finalize = args.length ? args.shift() : null;
|
|
|
command = args.length ? args.shift() : null;
|
|
|
options = args.length ? args.shift() || {} : {};
|
|
|
|
|
|
// Make sure we are backward compatible
|
|
|
if (!(typeof finalize === 'function')) {
|
|
|
command = finalize;
|
|
|
finalize = null;
|
|
|
}
|
|
|
|
|
|
if (
|
|
|
!Array.isArray(keys) &&
|
|
|
keys instanceof Object &&
|
|
|
typeof keys !== 'function' &&
|
|
|
!(keys._bsontype === 'Code')
|
|
|
) {
|
|
|
keys = Object.keys(keys);
|
|
|
}
|
|
|
|
|
|
if (typeof reduce === 'function') {
|
|
|
reduce = reduce.toString();
|
|
|
}
|
|
|
|
|
|
if (typeof finalize === 'function') {
|
|
|
finalize = finalize.toString();
|
|
|
}
|
|
|
|
|
|
// Set up the command as default
|
|
|
command = command == null ? true : command;
|
|
|
|
|
|
return executeOperation(this.s.topology, group, [
|
|
|
this,
|
|
|
keys,
|
|
|
condition,
|
|
|
initial,
|
|
|
reduce,
|
|
|
finalize,
|
|
|
command,
|
|
|
options,
|
|
|
callback
|
|
|
]);
|
|
|
};
|
|
|
|
|
|
var group = function(self, keys, condition, initial, reduce, finalize, command, options, callback) {
|
|
|
// Execute using the command
|
|
|
if (command) {
|
|
|
var reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce);
|
|
|
|
|
|
var selector = {
|
|
|
group: {
|
|
|
ns: self.s.name,
|
|
|
$reduce: reduceFunction,
|
|
|
cond: condition,
|
|
|
initial: initial,
|
|
|
out: 'inline'
|
|
|
}
|
|
|
};
|
|
|
|
|
|
// if finalize is defined
|
|
|
if (finalize != null) selector.group['finalize'] = finalize;
|
|
|
// Set up group selector
|
|
|
if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) {
|
|
|
selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys);
|
|
|
} else {
|
|
|
var hash = {};
|
|
|
keys.forEach(function(key) {
|
|
|
hash[key] = 1;
|
|
|
});
|
|
|
selector.group.key = hash;
|
|
|
}
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db, self);
|
|
|
|
|
|
// Do we have a readConcern specified
|
|
|
decorateWithReadConcern(selector, self, options);
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(selector, self, options);
|
|
|
|
|
|
// Execute command
|
|
|
self.s.db.command(selector, options, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
handleCallback(callback, null, result.retval);
|
|
|
});
|
|
|
} else {
|
|
|
// Create execution scope
|
|
|
var scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {};
|
|
|
|
|
|
scope.ns = self.s.name;
|
|
|
scope.keys = keys;
|
|
|
scope.condition = condition;
|
|
|
scope.initial = initial;
|
|
|
|
|
|
// Pass in the function text to execute within mongodb.
|
|
|
var groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
|
|
|
|
|
|
self.s.db.eval(new Code(groupfn, scope), null, options, function(err, results) {
|
|
|
if (err) return handleCallback(callback, err, null);
|
|
|
handleCallback(callback, null, results.result || results);
|
|
|
});
|
|
|
}
|
|
|
};
|
|
|
|
|
|
define.classMethod('group', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Functions that are passed as scope args must
|
|
|
* be converted to Code instances.
|
|
|
* @ignore
|
|
|
*/
|
|
|
function processScope(scope) {
|
|
|
if (!isObject(scope) || scope._bsontype === 'ObjectID') {
|
|
|
return scope;
|
|
|
}
|
|
|
|
|
|
var keys = Object.keys(scope);
|
|
|
var i = keys.length;
|
|
|
var key;
|
|
|
var new_scope = {};
|
|
|
|
|
|
while (i--) {
|
|
|
key = keys[i];
|
|
|
if ('function' === typeof scope[key]) {
|
|
|
new_scope[key] = new Code(String(scope[key]));
|
|
|
} else {
|
|
|
new_scope[key] = processScope(scope[key]);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return new_scope;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {(function|string)} map The mapping function.
|
|
|
* @param {(function|string)} reduce The reduce function.
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
|
* @param {object} [options.out=null] Sets the output target for the map reduce job. *{inline:1} | {replace:'collectionName'} | {merge:'collectionName'} | {reduce:'collectionName'}*
|
|
|
* @param {object} [options.query=null] Query filter object.
|
|
|
* @param {object} [options.sort=null] Sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces.
|
|
|
* @param {number} [options.limit=null] Number of objects to return from collection.
|
|
|
* @param {boolean} [options.keeptemp=false] Keep temporary data.
|
|
|
* @param {(function|string)} [options.finalize=null] Finalize function.
|
|
|
* @param {object} [options.scope=null] Can pass in variables that can be access from map/reduce/finalize.
|
|
|
* @param {boolean} [options.jsMode=false] It is possible to make the execution stay in JS. Provided in MongoDB > 2.0.X.
|
|
|
* @param {boolean} [options.verbose=false] Provide statistics on job execution time.
|
|
|
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {Collection~resultCallback} [callback] The command result callback
|
|
|
* @throws {MongoError}
|
|
|
* @return {Promise} returns Promise if no callback passed
|
|
|
*/
|
|
|
Collection.prototype.mapReduce = function(map, reduce, options, callback) {
|
|
|
if ('function' === typeof options) (callback = options), (options = {});
|
|
|
// Out must allways be defined (make sure we don't break weirdly on pre 1.8+ servers)
|
|
|
if (null == options.out) {
|
|
|
throw new Error(
|
|
|
'the out option parameter must be defined, see mongodb docs for possible values'
|
|
|
);
|
|
|
}
|
|
|
|
|
|
if ('function' === typeof map) {
|
|
|
map = map.toString();
|
|
|
}
|
|
|
|
|
|
if ('function' === typeof reduce) {
|
|
|
reduce = reduce.toString();
|
|
|
}
|
|
|
|
|
|
if ('function' === typeof options.finalize) {
|
|
|
options.finalize = options.finalize.toString();
|
|
|
}
|
|
|
|
|
|
return executeOperation(this.s.topology, mapReduce, [this, map, reduce, options, callback]);
|
|
|
};
|
|
|
|
|
|
var mapReduce = function(self, map, reduce, options, callback) {
|
|
|
var mapCommandHash = {
|
|
|
mapreduce: self.s.name,
|
|
|
map: map,
|
|
|
reduce: reduce
|
|
|
};
|
|
|
|
|
|
// Exclusion list
|
|
|
var exclusionList = ['readPreference', 'session'];
|
|
|
|
|
|
// Add any other options passed in
|
|
|
for (var n in options) {
|
|
|
if ('scope' === n) {
|
|
|
mapCommandHash[n] = processScope(options[n]);
|
|
|
} else {
|
|
|
// Only include if not in exclusion list
|
|
|
if (exclusionList.indexOf(n) === -1) {
|
|
|
mapCommandHash[n] = options[n];
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
options = shallowClone(options);
|
|
|
|
|
|
// Ensure we have the right read preference inheritance
|
|
|
options = getReadPreference(self, options, self.s.db, self);
|
|
|
|
|
|
// If we have a read preference and inline is not set as output fail hard
|
|
|
if (
|
|
|
options.readPreference !== false &&
|
|
|
options.readPreference !== 'primary' &&
|
|
|
options['out'] &&
|
|
|
(options['out'].inline !== 1 && options['out'] !== 'inline')
|
|
|
) {
|
|
|
// Force readPreference to primary
|
|
|
options.readPreference = 'primary';
|
|
|
// Decorate command with writeConcern if supported
|
|
|
decorateWithWriteConcern(mapCommandHash, self, options);
|
|
|
} else {
|
|
|
decorateWithReadConcern(mapCommandHash, self, options);
|
|
|
}
|
|
|
|
|
|
// Is bypassDocumentValidation specified
|
|
|
if (typeof options.bypassDocumentValidation === 'boolean') {
|
|
|
mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
|
|
|
}
|
|
|
|
|
|
// Have we specified collation
|
|
|
decorateWithCollation(mapCommandHash, self, options);
|
|
|
|
|
|
// Execute command
|
|
|
self.s.db.command(mapCommandHash, options, function(err, result) {
|
|
|
if (err) return handleCallback(callback, err);
|
|
|
// Check if we have an error
|
|
|
if (1 !== result.ok || result.err || result.errmsg) {
|
|
|
return handleCallback(callback, toError(result));
|
|
|
}
|
|
|
|
|
|
// Create statistics value
|
|
|
var stats = {};
|
|
|
if (result.timeMillis) stats['processtime'] = result.timeMillis;
|
|
|
if (result.counts) stats['counts'] = result.counts;
|
|
|
if (result.timing) stats['timing'] = result.timing;
|
|
|
|
|
|
// invoked with inline?
|
|
|
if (result.results) {
|
|
|
// If we wish for no verbosity
|
|
|
if (options['verbose'] == null || !options['verbose']) {
|
|
|
return handleCallback(callback, null, result.results);
|
|
|
}
|
|
|
|
|
|
return handleCallback(callback, null, { results: result.results, stats: stats });
|
|
|
}
|
|
|
|
|
|
// The returned collection
|
|
|
var collection = null;
|
|
|
|
|
|
// If we have an object it's a different db
|
|
|
if (result.result != null && typeof result.result === 'object') {
|
|
|
var doc = result.result;
|
|
|
// Return a collection from another db
|
|
|
var Db = require('./db');
|
|
|
collection = new Db(doc.db, self.s.db.s.topology, self.s.db.s.options).collection(
|
|
|
doc.collection
|
|
|
);
|
|
|
} else {
|
|
|
// Create a collection object that wraps the result collection
|
|
|
collection = self.s.db.collection(result.result);
|
|
|
}
|
|
|
|
|
|
// If we wish for no verbosity
|
|
|
if (options['verbose'] == null || !options['verbose']) {
|
|
|
return handleCallback(callback, err, collection);
|
|
|
}
|
|
|
|
|
|
// Return stats as third set of values
|
|
|
handleCallback(callback, err, { collection: collection, stats: stats });
|
|
|
});
|
|
|
};
|
|
|
|
|
|
define.classMethod('mapReduce', { callback: true, promise: true });
|
|
|
|
|
|
/**
|
|
|
* Initiate a Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @return {UnorderedBulkOperation}
|
|
|
*/
|
|
|
Collection.prototype.initializeUnorderedBulkOp = function(options) {
|
|
|
options = options || {};
|
|
|
options.promiseLibrary = this.s.promiseLibrary;
|
|
|
return unordered(this.s.topology, this, options);
|
|
|
};
|
|
|
|
|
|
define.classMethod('initializeUnorderedBulkOp', {
|
|
|
callback: false,
|
|
|
promise: false,
|
|
|
returns: [ordered.UnorderedBulkOperation]
|
|
|
});
|
|
|
|
|
|
/**
|
|
|
* Initiate an In order bulk write operation, operations will be serially executed in the order they are added, creating a new operation for each switch in types.
|
|
|
*
|
|
|
* @method
|
|
|
* @param {object} [options=null] Optional settings.
|
|
|
* @param {(number|string)} [options.w=null] The write concern.
|
|
|
* @param {number} [options.wtimeout=null] The write concern timeout.
|
|
|
* @param {boolean} [options.j=false] Specify a journal write concern.
|
|
|
* @param {ClientSession} [options.session] optional session to use for this operation
|
|
|
* @param {OrderedBulkOperation} callback The command result callback
|
|
|
* @return {null}
|
|
|
*/
|
|
|
Collection.prototype.initializeOrderedBulkOp = function(options) {
|
|
|
options = options || {};
|
|
|
options.promiseLibrary = this.s.promiseLibrary;
|
|
|
return ordered(this.s.topology, this, options);
|
|
|
};
|
|
|
|
|
|
define.classMethod('initializeOrderedBulkOp', {
|
|
|
callback: false,
|
|
|
promise: false,
|
|
|
returns: [ordered.OrderedBulkOperation]
|
|
|
});
|
|
|
|
|
|
// Get write concern
|
|
|
var writeConcern = function(target, db, col, options) {
|
|
|
if (options.w != null || options.j != null || options.fsync != null) {
|
|
|
var opts = {};
|
|
|
if (options.w != null) opts.w = options.w;
|
|
|
if (options.wtimeout != null) opts.wtimeout = options.wtimeout;
|
|
|
if (options.j != null) opts.j = options.j;
|
|
|
if (options.fsync != null) opts.fsync = options.fsync;
|
|
|
target.writeConcern = opts;
|
|
|
} else if (
|
|
|
col.writeConcern.w != null ||
|
|
|
col.writeConcern.j != null ||
|
|
|
col.writeConcern.fsync != null
|
|
|
) {
|
|
|
target.writeConcern = col.writeConcern;
|
|
|
} else if (
|
|
|
db.writeConcern.w != null ||
|
|
|
db.writeConcern.j != null ||
|
|
|
db.writeConcern.fsync != null
|
|
|
) {
|
|
|
target.writeConcern = db.writeConcern;
|
|
|
}
|
|
|
|
|
|
// NOTE: there is probably a much better place for this
|
|
|
if (db.s.options.retryWrites) target.retryWrites = true;
|
|
|
|
|
|
return target;
|
|
|
};
|
|
|
|
|
|
// Figure out the read preference
|
|
|
var getReadPreference = function(self, options, db) {
|
|
|
let r = null;
|
|
|
if (options.readPreference) {
|
|
|
r = options.readPreference;
|
|
|
} else if (self.s.readPreference) {
|
|
|
r = self.s.readPreference;
|
|
|
} else if (db.s.readPreference) {
|
|
|
r = db.s.readPreference;
|
|
|
} else {
|
|
|
return options;
|
|
|
}
|
|
|
|
|
|
if (typeof r === 'string') {
|
|
|
options.readPreference = new ReadPreference(r);
|
|
|
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
|
|
|
const mode = r.mode || r.preference;
|
|
|
if (mode && typeof mode === 'string') {
|
|
|
options.readPreference = new ReadPreference(mode, r.tags, {
|
|
|
maxStalenessSeconds: r.maxStalenessSeconds
|
|
|
});
|
|
|
}
|
|
|
} else if (!(r instanceof ReadPreference)) {
|
|
|
throw new TypeError('Invalid read preference: ' + r);
|
|
|
}
|
|
|
|
|
|
return options;
|
|
|
};
|
|
|
|
|
|
module.exports = Collection;
|