You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
805 lines
25 KiB
805 lines
25 KiB
'use strict';
|
|
|
|
var inherits = require('util').inherits,
|
|
EventEmitter = require('events').EventEmitter,
|
|
net = require('net'),
|
|
tls = require('tls'),
|
|
crypto = require('crypto'),
|
|
f = require('util').format,
|
|
debugOptions = require('./utils').debugOptions,
|
|
parseHeader = require('../wireprotocol/shared').parseHeader,
|
|
decompress = require('../wireprotocol/compression').decompress,
|
|
Response = require('./commands').Response,
|
|
MongoNetworkError = require('../error').MongoNetworkError,
|
|
Logger = require('./logger'),
|
|
OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED,
|
|
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
|
|
|
|
var _id = 0;
|
|
var debugFields = [
|
|
'host',
|
|
'port',
|
|
'size',
|
|
'keepAlive',
|
|
'keepAliveInitialDelay',
|
|
'noDelay',
|
|
'connectionTimeout',
|
|
'socketTimeout',
|
|
'singleBufferSerializtion',
|
|
'ssl',
|
|
'ca',
|
|
'crl',
|
|
'cert',
|
|
'rejectUnauthorized',
|
|
'promoteLongs',
|
|
'promoteValues',
|
|
'promoteBuffers',
|
|
'checkServerIdentity'
|
|
];
|
|
|
|
var connectionAccountingSpy = undefined;
|
|
var connectionAccounting = false;
|
|
var connections = {};
|
|
|
|
/**
|
|
* Creates a new Connection instance
|
|
* @class
|
|
* @param {string} options.host The server host
|
|
* @param {number} options.port The server port
|
|
* @param {number} [options.family=null] IP version for DNS lookup, passed down to Node's [`dns.lookup()` function](https://nodejs.org/api/dns.html#dns_dns_lookup_hostname_options_callback). If set to `6`, will only look for ipv6 addresses.
|
|
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
|
|
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
|
|
* @param {boolean} [options.noDelay=true] TCP Connection no delay
|
|
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
|
|
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
|
|
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
|
|
* @param {boolean} [options.ssl=false] Use SSL for connection
|
|
* @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
|
|
* @param {Buffer} [options.ca] SSL Certificate store binary buffer
|
|
* @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
|
|
* @param {Buffer} [options.cert] SSL Certificate binary buffer
|
|
* @param {Buffer} [options.key] SSL Key file binary buffer
|
|
* @param {string} [options.passphrase] SSL Certificate pass phrase
|
|
* @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
|
|
* @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
|
|
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
|
|
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
|
|
* @fires Connection#connect
|
|
* @fires Connection#close
|
|
* @fires Connection#error
|
|
* @fires Connection#timeout
|
|
* @fires Connection#parseError
|
|
* @return {Connection} A cursor instance
|
|
*/
|
|
var Connection = function(messageHandler, options) {
|
|
// Add event listener
|
|
EventEmitter.call(this);
|
|
// Set empty if no options passed
|
|
this.options = options || {};
|
|
// Identification information
|
|
this.id = _id++;
|
|
// Logger instance
|
|
this.logger = Logger('Connection', options);
|
|
// No bson parser passed in
|
|
if (!options.bson) throw new Error('must pass in valid bson parser');
|
|
// Get bson parser
|
|
this.bson = options.bson;
|
|
// Grouping tag used for debugging purposes
|
|
this.tag = options.tag;
|
|
// Message handler
|
|
this.messageHandler = messageHandler;
|
|
|
|
// Max BSON message size
|
|
this.maxBsonMessageSize = options.maxBsonMessageSize || 1024 * 1024 * 16 * 4;
|
|
// Debug information
|
|
if (this.logger.isDebug())
|
|
this.logger.debug(
|
|
f(
|
|
'creating connection %s with options [%s]',
|
|
this.id,
|
|
JSON.stringify(debugOptions(debugFields, options))
|
|
)
|
|
);
|
|
|
|
// Default options
|
|
this.port = options.port || 27017;
|
|
this.host = options.host || 'localhost';
|
|
this.family = typeof options.family === 'number' ? options.family : void 0;
|
|
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
|
|
this.keepAliveInitialDelay =
|
|
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
|
|
this.noDelay = typeof options.noDelay === 'boolean' ? options.noDelay : true;
|
|
this.connectionTimeout =
|
|
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
|
|
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
|
|
|
|
// Is the keepAliveInitialDelay > socketTimeout set it to half of socketTimeout
|
|
if (this.keepAliveInitialDelay > this.socketTimeout) {
|
|
this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2);
|
|
}
|
|
|
|
// If connection was destroyed
|
|
this.destroyed = false;
|
|
|
|
// Check if we have a domain socket
|
|
this.domainSocket = this.host.indexOf('/') !== -1;
|
|
|
|
// Serialize commands using function
|
|
this.singleBufferSerializtion =
|
|
typeof options.singleBufferSerializtion === 'boolean' ? options.singleBufferSerializtion : true;
|
|
this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
|
|
|
|
// SSL options
|
|
this.ca = options.ca || null;
|
|
this.crl = options.crl || null;
|
|
this.cert = options.cert || null;
|
|
this.key = options.key || null;
|
|
this.passphrase = options.passphrase || null;
|
|
this.ciphers = options.ciphers || null;
|
|
this.ecdhCurve = options.ecdhCurve || null;
|
|
this.ssl = typeof options.ssl === 'boolean' ? options.ssl : false;
|
|
this.rejectUnauthorized =
|
|
typeof options.rejectUnauthorized === 'boolean' ? options.rejectUnauthorized : true;
|
|
this.checkServerIdentity =
|
|
typeof options.checkServerIdentity === 'boolean' ||
|
|
typeof options.checkServerIdentity === 'function'
|
|
? options.checkServerIdentity
|
|
: true;
|
|
|
|
// If ssl not enabled
|
|
if (!this.ssl) this.rejectUnauthorized = false;
|
|
|
|
// Response options
|
|
this.responseOptions = {
|
|
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
|
|
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
|
|
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false
|
|
};
|
|
|
|
// Flushing
|
|
this.flushing = false;
|
|
this.queue = [];
|
|
|
|
// Internal state
|
|
this.connection = null;
|
|
this.writeStream = null;
|
|
|
|
// Create hash method
|
|
var hash = crypto.createHash('sha1');
|
|
hash.update(f('%s:%s', this.host, this.port));
|
|
|
|
// Create a hash name
|
|
this.hashedName = hash.digest('hex');
|
|
|
|
// All operations in flight on the connection
|
|
this.workItems = [];
|
|
};
|
|
|
|
inherits(Connection, EventEmitter);
|
|
|
|
Connection.prototype.setSocketTimeout = function(value) {
|
|
if (this.connection) {
|
|
this.connection.setTimeout(value);
|
|
}
|
|
};
|
|
|
|
Connection.prototype.resetSocketTimeout = function() {
|
|
if (this.connection) {
|
|
this.connection.setTimeout(this.socketTimeout);
|
|
}
|
|
};
|
|
|
|
Connection.enableConnectionAccounting = function(spy) {
|
|
if (spy) {
|
|
connectionAccountingSpy = spy;
|
|
}
|
|
|
|
connectionAccounting = true;
|
|
connections = {};
|
|
};
|
|
|
|
Connection.disableConnectionAccounting = function() {
|
|
connectionAccounting = false;
|
|
connectionAccountingSpy = undefined;
|
|
};
|
|
|
|
Connection.connections = function() {
|
|
return connections;
|
|
};
|
|
|
|
function deleteConnection(id) {
|
|
// console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : ''))
|
|
delete connections[id];
|
|
|
|
if (connectionAccountingSpy) {
|
|
connectionAccountingSpy.deleteConnection(id);
|
|
}
|
|
}
|
|
|
|
function addConnection(id, connection) {
|
|
// console.log("=== added connection " + id + " :: " + connection.port)
|
|
connections[id] = connection;
|
|
|
|
if (connectionAccountingSpy) {
|
|
connectionAccountingSpy.addConnection(id, connection);
|
|
}
|
|
}
|
|
|
|
//
|
|
// Connection handlers
|
|
var errorHandler = function(self) {
|
|
return function(err) {
|
|
if (connectionAccounting) deleteConnection(self.id);
|
|
// Debug information
|
|
if (self.logger.isDebug())
|
|
self.logger.debug(
|
|
f(
|
|
'connection %s for [%s:%s] errored out with [%s]',
|
|
self.id,
|
|
self.host,
|
|
self.port,
|
|
JSON.stringify(err)
|
|
)
|
|
);
|
|
// Emit the error
|
|
if (self.listeners('error').length > 0) self.emit('error', new MongoNetworkError(err), self);
|
|
};
|
|
};
|
|
|
|
var timeoutHandler = function(self) {
|
|
return function() {
|
|
if (connectionAccounting) deleteConnection(self.id);
|
|
// Debug information
|
|
if (self.logger.isDebug())
|
|
self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
|
|
// Emit timeout error
|
|
self.emit(
|
|
'timeout',
|
|
new MongoNetworkError(f('connection %s to %s:%s timed out', self.id, self.host, self.port)),
|
|
self
|
|
);
|
|
};
|
|
};
|
|
|
|
var closeHandler = function(self) {
|
|
return function(hadError) {
|
|
if (connectionAccounting) deleteConnection(self.id);
|
|
// Debug information
|
|
if (self.logger.isDebug())
|
|
self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
|
|
|
|
// Emit close event
|
|
if (!hadError) {
|
|
self.emit(
|
|
'close',
|
|
new MongoNetworkError(f('connection %s to %s:%s closed', self.id, self.host, self.port)),
|
|
self
|
|
);
|
|
}
|
|
};
|
|
};
|
|
|
|
// Handle a message once it is recieved
|
|
var emitMessageHandler = function(self, message) {
|
|
var msgHeader = parseHeader(message);
|
|
if (msgHeader.opCode === OP_COMPRESSED) {
|
|
msgHeader.fromCompressed = true;
|
|
var index = MESSAGE_HEADER_SIZE;
|
|
msgHeader.opCode = message.readInt32LE(index);
|
|
index += 4;
|
|
msgHeader.length = message.readInt32LE(index);
|
|
index += 4;
|
|
var compressorID = message[index];
|
|
index++;
|
|
decompress(compressorID, message.slice(index), function(err, decompressedMsgBody) {
|
|
if (err) {
|
|
throw err;
|
|
}
|
|
if (decompressedMsgBody.length !== msgHeader.length) {
|
|
throw new Error(
|
|
'Decompressing a compressed message from the server failed. The message is corrupt.'
|
|
);
|
|
}
|
|
self.messageHandler(
|
|
new Response(self.bson, message, msgHeader, decompressedMsgBody, self.responseOptions),
|
|
self
|
|
);
|
|
});
|
|
} else {
|
|
self.messageHandler(
|
|
new Response(
|
|
self.bson,
|
|
message,
|
|
msgHeader,
|
|
message.slice(MESSAGE_HEADER_SIZE),
|
|
self.responseOptions
|
|
),
|
|
self
|
|
);
|
|
}
|
|
};
|
|
|
|
var dataHandler = function(self) {
|
|
return function(data) {
|
|
// Parse until we are done with the data
|
|
while (data.length > 0) {
|
|
// If we still have bytes to read on the current message
|
|
if (self.bytesRead > 0 && self.sizeOfMessage > 0) {
|
|
// Calculate the amount of remaining bytes
|
|
var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
|
|
// Check if the current chunk contains the rest of the message
|
|
if (remainingBytesToRead > data.length) {
|
|
// Copy the new data into the exiting buffer (should have been allocated when we know the message size)
|
|
data.copy(self.buffer, self.bytesRead);
|
|
// Adjust the number of bytes read so it point to the correct index in the buffer
|
|
self.bytesRead = self.bytesRead + data.length;
|
|
|
|
// Reset state of buffer
|
|
data = new Buffer(0);
|
|
} else {
|
|
// Copy the missing part of the data into our current buffer
|
|
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
|
|
// Slice the overflow into a new buffer that we will then re-parse
|
|
data = data.slice(remainingBytesToRead);
|
|
|
|
// Emit current complete message
|
|
try {
|
|
var emitBuffer = self.buffer;
|
|
// Reset state of buffer
|
|
self.buffer = null;
|
|
self.sizeOfMessage = 0;
|
|
self.bytesRead = 0;
|
|
self.stubBuffer = null;
|
|
|
|
emitMessageHandler(self, emitBuffer);
|
|
} catch (err) {
|
|
var errorObject = {
|
|
err: 'socketHandler',
|
|
trace: err,
|
|
bin: self.buffer,
|
|
parseState: {
|
|
sizeOfMessage: self.sizeOfMessage,
|
|
bytesRead: self.bytesRead,
|
|
stubBuffer: self.stubBuffer
|
|
}
|
|
};
|
|
// We got a parse Error fire it off then keep going
|
|
self.emit('parseError', errorObject, self);
|
|
}
|
|
}
|
|
} else {
|
|
// Stub buffer is kept in case we don't get enough bytes to determine the
|
|
// size of the message (< 4 bytes)
|
|
if (self.stubBuffer != null && self.stubBuffer.length > 0) {
|
|
// If we have enough bytes to determine the message size let's do it
|
|
if (self.stubBuffer.length + data.length > 4) {
|
|
// Prepad the data
|
|
var newData = new Buffer(self.stubBuffer.length + data.length);
|
|
self.stubBuffer.copy(newData, 0);
|
|
data.copy(newData, self.stubBuffer.length);
|
|
// Reassign for parsing
|
|
data = newData;
|
|
|
|
// Reset state of buffer
|
|
self.buffer = null;
|
|
self.sizeOfMessage = 0;
|
|
self.bytesRead = 0;
|
|
self.stubBuffer = null;
|
|
} else {
|
|
// Add the the bytes to the stub buffer
|
|
var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
|
|
// Copy existing stub buffer
|
|
self.stubBuffer.copy(newStubBuffer, 0);
|
|
// Copy missing part of the data
|
|
data.copy(newStubBuffer, self.stubBuffer.length);
|
|
// Exit parsing loop
|
|
data = new Buffer(0);
|
|
}
|
|
} else {
|
|
if (data.length > 4) {
|
|
// Retrieve the message size
|
|
// var sizeOfMessage = data.readUInt32LE(0);
|
|
var sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24);
|
|
// If we have a negative sizeOfMessage emit error and return
|
|
if (sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
|
|
errorObject = {
|
|
err: 'socketHandler',
|
|
trace: '',
|
|
bin: self.buffer,
|
|
parseState: {
|
|
sizeOfMessage: sizeOfMessage,
|
|
bytesRead: self.bytesRead,
|
|
stubBuffer: self.stubBuffer
|
|
}
|
|
};
|
|
// We got a parse Error fire it off then keep going
|
|
self.emit('parseError', errorObject, self);
|
|
return;
|
|
}
|
|
|
|
// Ensure that the size of message is larger than 0 and less than the max allowed
|
|
if (
|
|
sizeOfMessage > 4 &&
|
|
sizeOfMessage < self.maxBsonMessageSize &&
|
|
sizeOfMessage > data.length
|
|
) {
|
|
self.buffer = new Buffer(sizeOfMessage);
|
|
// Copy all the data into the buffer
|
|
data.copy(self.buffer, 0);
|
|
// Update bytes read
|
|
self.bytesRead = data.length;
|
|
// Update sizeOfMessage
|
|
self.sizeOfMessage = sizeOfMessage;
|
|
// Ensure stub buffer is null
|
|
self.stubBuffer = null;
|
|
// Exit parsing loop
|
|
data = new Buffer(0);
|
|
} else if (
|
|
sizeOfMessage > 4 &&
|
|
sizeOfMessage < self.maxBsonMessageSize &&
|
|
sizeOfMessage === data.length
|
|
) {
|
|
try {
|
|
emitBuffer = data;
|
|
// Reset state of buffer
|
|
self.buffer = null;
|
|
self.sizeOfMessage = 0;
|
|
self.bytesRead = 0;
|
|
self.stubBuffer = null;
|
|
// Exit parsing loop
|
|
data = new Buffer(0);
|
|
// Emit the message
|
|
emitMessageHandler(self, emitBuffer);
|
|
} catch (err) {
|
|
self.emit('parseError', err, self);
|
|
}
|
|
} else if (sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
|
|
errorObject = {
|
|
err: 'socketHandler',
|
|
trace: null,
|
|
bin: data,
|
|
parseState: {
|
|
sizeOfMessage: sizeOfMessage,
|
|
bytesRead: 0,
|
|
buffer: null,
|
|
stubBuffer: null
|
|
}
|
|
};
|
|
// We got a parse Error fire it off then keep going
|
|
self.emit('parseError', errorObject, self);
|
|
|
|
// Clear out the state of the parser
|
|
self.buffer = null;
|
|
self.sizeOfMessage = 0;
|
|
self.bytesRead = 0;
|
|
self.stubBuffer = null;
|
|
// Exit parsing loop
|
|
data = new Buffer(0);
|
|
} else {
|
|
emitBuffer = data.slice(0, sizeOfMessage);
|
|
// Reset state of buffer
|
|
self.buffer = null;
|
|
self.sizeOfMessage = 0;
|
|
self.bytesRead = 0;
|
|
self.stubBuffer = null;
|
|
// Copy rest of message
|
|
data = data.slice(sizeOfMessage);
|
|
// Emit the message
|
|
emitMessageHandler(self, emitBuffer);
|
|
}
|
|
} else {
|
|
// Create a buffer that contains the space for the non-complete message
|
|
self.stubBuffer = new Buffer(data.length);
|
|
// Copy the data to the stub buffer
|
|
data.copy(self.stubBuffer, 0);
|
|
// Exit parsing loop
|
|
data = new Buffer(0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
// List of socket level valid ssl options
|
|
var legalSslSocketOptions = [
|
|
'pfx',
|
|
'key',
|
|
'passphrase',
|
|
'cert',
|
|
'ca',
|
|
'ciphers',
|
|
'NPNProtocols',
|
|
'ALPNProtocols',
|
|
'servername',
|
|
'ecdhCurve',
|
|
'secureProtocol',
|
|
'secureContext',
|
|
'session',
|
|
'minDHSize'
|
|
];
|
|
|
|
function merge(options1, options2) {
|
|
// Merge in any allowed ssl options
|
|
for (var name in options2) {
|
|
if (options2[name] != null && legalSslSocketOptions.indexOf(name) !== -1) {
|
|
options1[name] = options2[name];
|
|
}
|
|
}
|
|
}
|
|
|
|
function makeSSLConnection(self, _options) {
|
|
let sslOptions = {
|
|
socket: self.connection,
|
|
rejectUnauthorized: self.rejectUnauthorized
|
|
};
|
|
|
|
// Merge in options
|
|
merge(sslOptions, self.options);
|
|
merge(sslOptions, _options);
|
|
|
|
// Set options for ssl
|
|
if (self.ca) sslOptions.ca = self.ca;
|
|
if (self.crl) sslOptions.crl = self.crl;
|
|
if (self.cert) sslOptions.cert = self.cert;
|
|
if (self.key) sslOptions.key = self.key;
|
|
if (self.passphrase) sslOptions.passphrase = self.passphrase;
|
|
|
|
// Override checkServerIdentity behavior
|
|
if (self.checkServerIdentity === false) {
|
|
// Skip the identiy check by retuning undefined as per node documents
|
|
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
|
|
sslOptions.checkServerIdentity = function() {
|
|
return undefined;
|
|
};
|
|
} else if (typeof self.checkServerIdentity === 'function') {
|
|
sslOptions.checkServerIdentity = self.checkServerIdentity;
|
|
}
|
|
|
|
// Set default sni servername to be the same as host
|
|
if (sslOptions.servername == null) {
|
|
sslOptions.servername = self.host;
|
|
}
|
|
|
|
// Attempt SSL connection
|
|
const connection = tls.connect(self.port, self.host, sslOptions, function() {
|
|
// Error on auth or skip
|
|
if (connection.authorizationError && self.rejectUnauthorized) {
|
|
return self.emit('error', connection.authorizationError, self, { ssl: true });
|
|
}
|
|
|
|
// Set socket timeout instead of connection timeout
|
|
connection.setTimeout(self.socketTimeout);
|
|
// We are done emit connect
|
|
self.emit('connect', self);
|
|
});
|
|
|
|
// Set the options for the connection
|
|
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
|
|
connection.setTimeout(self.connectionTimeout);
|
|
connection.setNoDelay(self.noDelay);
|
|
|
|
return connection;
|
|
}
|
|
|
|
function makeUnsecureConnection(self, family) {
|
|
// Create new connection instance
|
|
let connection_options;
|
|
if (self.domainSocket) {
|
|
connection_options = { path: self.host };
|
|
} else {
|
|
connection_options = { port: self.port, host: self.host };
|
|
connection_options.family = family;
|
|
}
|
|
|
|
const connection = net.createConnection(connection_options);
|
|
|
|
// Set the options for the connection
|
|
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
|
|
connection.setTimeout(self.connectionTimeout);
|
|
connection.setNoDelay(self.noDelay);
|
|
|
|
connection.once('connect', function() {
|
|
// Set socket timeout instead of connection timeout
|
|
connection.setTimeout(self.socketTimeout);
|
|
// Emit connect event
|
|
self.emit('connect', self);
|
|
});
|
|
|
|
return connection;
|
|
}
|
|
|
|
function doConnect(self, family, _options, _errorHandler) {
|
|
self.connection = self.ssl
|
|
? makeSSLConnection(self, _options)
|
|
: makeUnsecureConnection(self, family);
|
|
|
|
// Add handlers for events
|
|
self.connection.once('error', _errorHandler);
|
|
self.connection.once('timeout', timeoutHandler(self));
|
|
self.connection.once('close', closeHandler(self));
|
|
self.connection.on('data', dataHandler(self));
|
|
}
|
|
|
|
/**
|
|
* Connect
|
|
* @method
|
|
*/
|
|
Connection.prototype.connect = function(_options) {
|
|
_options = _options || {};
|
|
// Set the connections
|
|
if (connectionAccounting) addConnection(this.id, this);
|
|
// Check if we are overriding the promoteLongs
|
|
if (typeof _options.promoteLongs === 'boolean') {
|
|
this.responseOptions.promoteLongs = _options.promoteLongs;
|
|
this.responseOptions.promoteValues = _options.promoteValues;
|
|
this.responseOptions.promoteBuffers = _options.promoteBuffers;
|
|
}
|
|
|
|
const _errorHandler = errorHandler(this);
|
|
|
|
if (this.family !== void 0) {
|
|
return doConnect(this, this.family, _options, _errorHandler);
|
|
}
|
|
|
|
return doConnect(this, 6, _options, err => {
|
|
if (this.logger.isDebug()) {
|
|
this.logger.debug(
|
|
f(
|
|
'connection %s for [%s:%s] errored out with [%s]',
|
|
this.id,
|
|
this.host,
|
|
this.port,
|
|
JSON.stringify(err)
|
|
)
|
|
);
|
|
}
|
|
|
|
// clean up existing event handlers
|
|
this.connection.removeAllListeners('error');
|
|
this.connection.removeAllListeners('timeout');
|
|
this.connection.removeAllListeners('close');
|
|
this.connection.removeAllListeners('data');
|
|
this.connection = undefined;
|
|
|
|
return doConnect(this, 4, _options, _errorHandler);
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Unref this connection
|
|
* @method
|
|
* @return {boolean}
|
|
*/
|
|
Connection.prototype.unref = function() {
|
|
if (this.connection) this.connection.unref();
|
|
else {
|
|
var self = this;
|
|
this.once('connect', function() {
|
|
self.connection.unref();
|
|
});
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Destroy connection
|
|
* @method
|
|
*/
|
|
Connection.prototype.destroy = function() {
|
|
// Set the connections
|
|
if (connectionAccounting) deleteConnection(this.id);
|
|
if (this.connection) {
|
|
// Catch posssible exception thrown by node 0.10.x
|
|
try {
|
|
this.connection.end();
|
|
} catch (err) {} // eslint-disable-line
|
|
// Destroy connection
|
|
this.connection.destroy();
|
|
}
|
|
|
|
this.destroyed = true;
|
|
};
|
|
|
|
/**
|
|
* Write to connection
|
|
* @method
|
|
* @param {Command} command Command to write out need to implement toBin and toBinUnified
|
|
*/
|
|
Connection.prototype.write = function(buffer) {
|
|
var i;
|
|
// Debug Log
|
|
if (this.logger.isDebug()) {
|
|
if (!Array.isArray(buffer)) {
|
|
this.logger.debug(
|
|
f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port)
|
|
);
|
|
} else {
|
|
for (i = 0; i < buffer.length; i++)
|
|
this.logger.debug(
|
|
f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port)
|
|
);
|
|
}
|
|
}
|
|
|
|
// Double check that the connection is not destroyed
|
|
if (this.connection.destroyed === false) {
|
|
// Write out the command
|
|
if (!Array.isArray(buffer)) {
|
|
this.connection.write(buffer, 'binary');
|
|
return true;
|
|
}
|
|
|
|
// Iterate over all buffers and write them in order to the socket
|
|
for (i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
|
|
return true;
|
|
}
|
|
|
|
// Connection is destroyed return write failed
|
|
return false;
|
|
};
|
|
|
|
/**
|
|
* Return id of connection as a string
|
|
* @method
|
|
* @return {string}
|
|
*/
|
|
Connection.prototype.toString = function() {
|
|
return '' + this.id;
|
|
};
|
|
|
|
/**
|
|
* Return json object of connection
|
|
* @method
|
|
* @return {object}
|
|
*/
|
|
Connection.prototype.toJSON = function() {
|
|
return { id: this.id, host: this.host, port: this.port };
|
|
};
|
|
|
|
/**
|
|
* Is the connection connected
|
|
* @method
|
|
* @return {boolean}
|
|
*/
|
|
Connection.prototype.isConnected = function() {
|
|
if (this.destroyed) return false;
|
|
return !this.connection.destroyed && this.connection.writable;
|
|
};
|
|
|
|
/**
|
|
* A server connect event, used to verify that the connection is up and running
|
|
*
|
|
* @event Connection#connect
|
|
* @type {Connection}
|
|
*/
|
|
|
|
/**
|
|
* The server connection closed, all pool connections closed
|
|
*
|
|
* @event Connection#close
|
|
* @type {Connection}
|
|
*/
|
|
|
|
/**
|
|
* The server connection caused an error, all pool connections closed
|
|
*
|
|
* @event Connection#error
|
|
* @type {Connection}
|
|
*/
|
|
|
|
/**
|
|
* The server connection timed out, all pool connections closed
|
|
*
|
|
* @event Connection#timeout
|
|
* @type {Connection}
|
|
*/
|
|
|
|
/**
|
|
* The driver experienced an invalid message, all pool connections closed
|
|
*
|
|
* @event Connection#parseError
|
|
* @type {Connection}
|
|
*/
|
|
|
|
module.exports = Connection;
|