DiscofyAPI/node_modules/mariadb/lib/pool-callback.js

202 lines
5.9 KiB
JavaScript
Raw Permalink Normal View History

2021-03-25 17:23:36 +01:00
'use strict';
const PoolBase = require('./pool-base');
const ConnectionCallback = require('./connection-callback');
const Errors = require('./misc/errors');
const util = require('util');
function PoolCallback(options) {
const processTaskCallback = function (conn, sql, values, isBatch) {
if (sql) {
return new Promise((resolve, reject) => {
const fct = isBatch ? conn.batch : conn.query;
fct(sql, values, (err, rows, fields) => {
conn.releaseWithoutError();
if (err) {
reject(err);
return;
}
return resolve(rows);
});
});
} else {
return Promise.resolve(conn);
}
};
const pingPromise = function (conn) {
return new Promise((resolve, reject) => {
conn.ping(options.pingTimeout, (err) => {
if (err) {
reject(err);
} else resolve();
});
});
};
const createConnectionPoolCallback = function (pool) {
const conn = new ConnectionCallback(options.connOptions);
return new Promise(function (resolve, reject) {
conn.connect((err) => {
if (err) {
reject(err);
} else {
if (pool.closed) {
//discard connection
conn.end((err) => {});
reject(
Errors.createError(
'Cannot create new connection to pool, pool closed',
true,
null,
'08S01',
Errors.ER_ADD_CONNECTION_CLOSED_POOL,
null
)
);
} else {
const initialEnd = conn.end;
conn.forceEnd = () => {
return new Promise(function (res, rej) {
initialEnd((err) => {
if (err) {
rej(err);
} else {
res();
}
});
});
};
conn.release = function (cb) {
if (pool.closed) {
pool._discardConnection(conn);
if (cb) cb();
return;
}
if (options.noControlAfterUse) {
pool._releaseConnection(conn);
if (cb) cb();
return;
}
//if server permit it, reset the connection, or rollback only if not
// COM_RESET_CONNECTION exist since mysql 5.7.3 and mariadb 10.2.4
// but not possible to use it with mysql waiting for https://bugs.mysql.com/bug.php?id=97633 correction.
// and mariadb only since https://jira.mariadb.org/browse/MDEV-18281
let revertFunction = conn.rollback;
if (
options.resetAfterUse &&
conn.info.isMariaDB() &&
((conn.info.serverVersion.minor === 2 && conn.info.hasMinVersion(10, 2, 22)) ||
conn.info.hasMinVersion(10, 3, 13))
) {
revertFunction = conn.reset;
}
revertFunction((errCall) => {
if (errCall) {
//uncertain connection state.
pool._discardConnection(conn);
if (cb) cb();
return;
} else {
pool._releaseConnection(conn);
}
if (cb) cb();
});
};
conn.end = conn.release;
conn.releaseWithoutError = () => {
conn.end((err) => {});
};
resolve(conn);
}
}
});
});
};
PoolBase.call(this, options, processTaskCallback, createConnectionPoolCallback, pingPromise);
const getConnectionPromise = this.getConnection.bind(this);
const endPromise = this.end.bind(this);
const queryPromise = this.query.bind(this);
const batchPromise = this.batch.bind(this);
const emptyError = (err) => {};
//*****************************************************************
// internal equivalent with callback of promised functions
//*****************************************************************
const _getConnectionCallback = (callback) => {
getConnectionPromise()
.then((conn) => {
if (callback) callback(null, conn);
})
.catch(callback || emptyError);
};
const _endCallback = (callback) => {
endPromise()
.then(() => {
if (callback) callback(null);
})
.catch(callback || emptyError);
};
/**
* Execute query using text protocol with callback emit columns/data/end/error
* events to permit streaming big result-set
*
* @param sql sql parameter Object can be used to supersede default option.
* Object must then have sql property.
* @param values object / array of placeholder values (not mandatory)
* @param cb callback
* @returns {Query} query
*/
const _queryCallback = function (sql, values, cb) {
let _cb = cb,
_values = values;
if (typeof values === 'function') {
_cb = values;
_values = undefined;
}
queryPromise(sql, _values)
.then((rows) => {
if (_cb) _cb(null, rows, rows.meta);
})
.catch(_cb || emptyError);
};
const _batchCallback = function (sql, values, cb) {
let _values = values,
_cb = cb;
if (typeof values === 'function') {
_cb = values;
_values = undefined;
}
batchPromise(sql, _values)
.then((rows) => {
if (_cb) _cb(null, rows, rows.meta);
})
.catch(_cb || emptyError);
};
//*****************************************************************
// replacing public promise function with callback equivalent
//*****************************************************************
this.end = _endCallback;
this.query = _queryCallback;
this.batch = _batchCallback;
this.getConnection = _getConnectionCallback;
}
util.inherits(PoolCallback, PoolBase);
module.exports = PoolCallback;