You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
4.4 KiB

'use strict';
const retrieveBSON = require('./connection/utils').retrieveBSON,
EventEmitter = require('events'),
BSON = retrieveBSON(),
Binary = BSON.Binary,
uuidV4 = require('./utils').uuidV4;
/**
*
*/
class ClientSession extends EventEmitter {
constructor(topology, sessionPool, options) {
super();
if (topology == null) {
throw new Error('ClientSession requires a topology');
}
if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
throw new Error('ClientSession requires a ServerSessionPool');
}
options = options || {};
this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.serverSession = sessionPool.acquire();
this.supports = {
causalConsistency: !!options.causalConsistency
};
options = options || {};
if (typeof options.initialClusterTime !== 'undefined') {
this.clusterTime = options.initialClusterTime;
} else {
this.clusterTime = null;
}
this.operationTime = null;
}
/**
*
*/
endSession(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
if (this.hasEnded) {
if (typeof callback === 'function') callback(null, null);
return;
}
if (!options.skipCommand) {
// send the `endSessions` command
this.topology.endSessions(this.id);
}
// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);
// release the server session back to the pool
this.sessionPool.release(this.serverSession);
// spec indicates that we should ignore all errors for `endSessions`
if (typeof callback === 'function') callback(null, null);
}
/**
* Advances the operationTime for a ClientSession.
*
* @param {object} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
*/
advanceOperationTime(operationTime) {
if (this.operationTime == null) {
this.operationTime = operationTime;
return;
}
if (operationTime.greaterThan(this.operationTime)) {
this.operationTime = operationTime;
}
}
/**
* Used to determine if this session equals another
*/
equals(session) {
if (!(session instanceof ClientSession)) {
return false;
}
return this.id.id.buffer.equals(session.id.id.buffer);
}
}
Object.defineProperty(ClientSession.prototype, 'id', {
get: function() {
return this.serverSession.id;
}
});
/**
*
*/
class ServerSession {
constructor() {
this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
this.lastUse = Date.now();
this.txnNumber = 0;
}
/**
*
* @param {*} sessionTimeoutMinutes
*/
hasTimedOut(sessionTimeoutMinutes) {
// Take the difference of the lastUse timestamp and now, which will result in a value in
// milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
const idleTimeMinutes = Math.round(
(((Date.now() - this.lastUse) % 86400000) % 3600000) / 60000
);
return idleTimeMinutes > sessionTimeoutMinutes - 1;
}
}
/**
*
*/
class ServerSessionPool {
constructor(topology) {
if (topology == null) {
throw new Error('ServerSessionPool requires a topology');
}
this.topology = topology;
this.sessions = [];
}
endAllPooledSessions() {
if (this.sessions.length) {
this.topology.endSessions(this.sessions.map(session => session.id));
this.sessions = [];
}
}
/**
* @returns {ServerSession}
*/
acquire() {
const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
while (this.sessions.length) {
const session = this.sessions.shift();
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
return session;
}
}
return new ServerSession();
}
/**
*
* @param {*} session
*/
release(session) {
const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
while (this.sessions.length) {
const session = this.sessions[this.sessions.length - 1];
if (session.hasTimedOut(sessionTimeoutMinutes)) {
this.sessions.pop();
} else {
break;
}
}
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
this.sessions.unshift(session);
}
}
}
module.exports = {
ClientSession: ClientSession,
ServerSession: ServerSession,
ServerSessionPool: ServerSessionPool
};