DiscofyAPI/node_modules/mariadb/lib/cmd/batch-bulk.js

277 lines
7.6 KiB
JavaScript

'use strict';
const CommonBinary = require('./common-binary-cmd');
const Errors = require('../misc/errors');
const Parse = require('../misc/parse');
const BulkPacket = require('../io/bulk-packet');
/**
* Protocol COM_STMT_BULK_EXECUTE
* see : https://mariadb.com/kb/en/library/com_stmt_bulk_execute/
*/
class BatchBulk extends CommonBinary {
constructor(resolve, reject, options, connOpts, sql, values) {
super(resolve, reject, options, connOpts, sql, values);
this.onPacketReceive = this.readPrepareResultPacket;
}
/**
* Send COM_STMT_BULK_EXECUTE
*
* @param out output writer
* @param opts connection options
* @param info connection information
*/
start(out, opts, info) {
this.sending = true;
this.info = info;
this.values = this.initialValues;
if (this.opts.timeout) {
const err = Errors.createError(
'Cannot use timeout for Batch statement',
false,
info,
'HY000',
Errors.ER_TIMEOUT_NOT_SUPPORTED
);
this.emit('send_end');
this.throwError(err, info);
return;
}
let questionMarkSql = this.sql;
if (this.opts.namedPlaceholders) {
const res = Parse.searchPlaceholder(
this.sql,
info,
this.initialValues,
this.displaySql.bind(this)
);
questionMarkSql = res.sql;
this.values = res.values;
}
if (!this.validateParameters(info)) {
this.sending = false;
return;
}
//send COM_STMT_PREPARE command
this.out = out;
this.packet = new BulkPacket(this.opts, out, this.values[0]);
out.startPacket(this);
out.writeInt8(0x16);
out.writeString(questionMarkSql);
out.flushBuffer(true);
if (this.opts.pipelining) {
out.startPacket(this);
this.valueIdx = 0;
this.sendQueries();
} else {
this.out = out;
}
}
sendQueries() {
let flushed = false;
while (!flushed && this.sending && this.valueIdx < this.values.length) {
this.valueRow = this.values[this.valueIdx++];
//********************************************
// send params
//********************************************
const len = this.valueRow.length;
for (let i = 0; i < len; i++) {
const value = this.valueRow[i];
if (value === null) {
flushed = this.packet.writeInt8(0x01) || flushed;
continue;
}
//********************************************
// param has no stream. directly write in buffer
//********************************************
flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
}
const last = this.valueIdx === this.values.length;
flushed = this.packet.mark(last, last ? null : this.values[this.valueIdx]) || flushed;
}
if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
//there is still data to send
setImmediate(this.sendQueries.bind(this));
} else {
if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
this.sending = false;
}
}
displaySql() {
if (this.opts && this.initialValues) {
if (this.sql.length > this.opts.debugLen) {
return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
}
let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
sqlMsg += '[';
for (let i = 0; i < this.initialValues.length; i++) {
if (i !== 0) sqlMsg += ',';
let param = this.initialValues[i];
sqlMsg = this.logParameters(sqlMsg, param);
if (sqlMsg.length > this.opts.debugLen) {
sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
break;
}
}
sqlMsg += ']';
return sqlMsg;
}
return 'sql: ' + this.sql + ' - parameters:[]';
}
success(val) {
this.packet.waitingResponseNo--;
if (!this.opts.pipelining && this.packet.statementId === -1) {
this.packet.statementId = this.statementId;
this.out.startPacket(this);
this.valueIdx = 0;
this.sendQueries();
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
return;
}
if (!this.sending && this.packet.waitingResponseNo === 0) {
//send COM_STMT_CLOSE packet
if (!this.firstError || !this.firstError.fatal) {
this.sequenceNo = -1;
this.compressSequenceNo = -1;
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);
}
this.sending = false;
this.emit('send_end');
if (this.packet.haveErrorResponse) {
this.packet = null;
this.resolve = null;
this.onPacketReceive = null;
this._columns = null;
this._rows = null;
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit('end', this.firstError);
} else {
this.packet = null;
let totalAffectedRows = 0;
this._rows.forEach((row) => {
totalAffectedRows += row.affectedRows;
});
const rs = {
affectedRows: totalAffectedRows,
insertId: this._rows[0].insertId,
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
this._columns = null;
this._rows = null;
}
return;
}
if (!this.packet.haveErrorResponse) {
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}
throwError(err, info) {
this.packet.waitingResponseNo--;
this.sending = false;
if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) {
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
err = Errors.createError(
err.message,
err.fatal,
info,
err.sqlState,
err.errno,
this.stack,
false
);
}
this.firstError = err;
this.packet.endedWithError();
}
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.resolve = null;
//send COM_STMT_CLOSE packet
if (!err.fatal && this.statementId) {
this.sequenceNo = -1;
this.compressSequenceNo = -1;
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);
}
this.emit('send_end');
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.onPacketReceive = null;
this.emit('end', this.firstError);
} else {
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}
/**
* Validate that parameters exists and are defined.
*
* @param info connection info
* @returns {boolean} return false if any error occur.
*/
validateParameters(info) {
//validate parameter size.
for (let r = 0; r < this.values.length; r++) {
if (!Array.isArray(this.values[r])) this.values[r] = [this.values[r]];
//validate parameter is defined.
for (let i = 0; i < this.values[r].length; i++) {
if (this.values[r][i] === undefined) {
this.emit('send_end');
this.throwNewError(
'Parameter at position ' +
(i + 1) +
' is undefined for values ' +
r +
'\n' +
this.displaySql(),
false,
info,
'HY000',
Errors.ER_PARAMETER_UNDEFINED
);
return false;
}
}
}
return true;
}
}
module.exports = BatchBulk;