DiscofyAPI/node_modules/mariadb/lib/cmd/batch-rewrite.js

371 lines
11 KiB
JavaScript

'use strict';
const CommonText = require('./common-text-cmd');
const Errors = require('../misc/errors');
const Parse = require('../misc/parse');
const RewritePacket = require('../io/rewrite-packet');
const QUOTE = 0x27;
/**
* Protocol COM_QUERY
* see : https://mariadb.com/kb/en/library/com_query/
*/
class BatchRewrite extends CommonText {
constructor(resolve, reject, options, connOpts, sql, values) {
super(resolve, reject, options, connOpts, sql, values);
}
/**
* Send COM_QUERY
*
* @param out output writer
* @param opts connection options
* @param info connection information
*/
start(out, opts, info) {
this.sending = true;
this.info = info;
if (this.opts.timeout) {
const err = Errors.createError(
'Cannot use timeout for Batch statement',
false,
info,
'HY000',
Errors.ER_TIMEOUT_NOT_SUPPORTED
);
this.emit('send_end');
this.throwError(err, info);
return;
}
if (this.initialValues.length === 0) this.initialValues = [[]];
if (this.opts.namedPlaceholders) {
this.parseResults = Parse.splitRewritableNamedParameterQuery(this.sql, this.initialValues);
this.values = this.parseResults.values;
} else {
this.parseResults = Parse.splitRewritableQuery(this.sql);
this.values = this.initialValues;
if (!this.validateParameters(info)) {
this.sending = false;
return;
}
}
out.startPacket(this);
this.packet = new RewritePacket(
this.opts.maxAllowedPacket,
out,
this.parseResults.partList[0],
this.parseResults.partList[this.parseResults.partList.length - 1]
);
this.onPacketReceive = this.readResponsePacket;
this.valueIdx = 0;
this.sendQueries();
}
sendQueries() {
let flushed = false;
while (!flushed && this.sending && this.valueIdx < this.values.length) {
this.valueRow = this.values[this.valueIdx++];
//********************************************
// send params
//********************************************
const len = this.parseResults.partList.length - 3;
for (let i = 0; i < len; i++) {
const value = this.valueRow[i];
flushed = this.packet.writeString(this.parseResults.partList[i + 1]) || flushed;
if (value === null) {
flushed = this.packet.writeStringAscii('NULL') || flushed;
continue;
}
if (
typeof value === 'object' &&
typeof value.pipe === 'function' &&
typeof value.read === 'function'
) {
//********************************************
// param is stream,
// now all params will be written by event
//********************************************
this.registerStreamSendEvent(this.packet, this.info);
this.currentParam = i;
this.packet.writeInt8(QUOTE); //'
value.on(
'data',
function (chunk) {
this.packet.writeBufferEscape(chunk);
}.bind(this)
);
value.on(
'end',
function () {
this.packet.writeInt8(QUOTE); //'
this.currentParam++;
this.paramWritten();
}.bind(this)
);
return;
} else {
//********************************************
// param isn't stream. directly write in buffer
//********************************************
flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
}
}
this.packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
}
if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
//there is still data to send
setImmediate(this.sendQueries.bind(this));
} else {
if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
this.sending = false;
}
}
displaySql() {
if (this.opts && this.initialValues) {
if (this.sql.length > this.opts.debugLen) {
return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
}
let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
sqlMsg += '[';
for (let i = 0; i < this.initialValues.length; i++) {
if (i !== 0) sqlMsg += ',';
let param = this.initialValues[i];
sqlMsg = this.logParameters(sqlMsg, param);
if (sqlMsg.length > this.opts.debugLen) {
sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
break;
}
}
sqlMsg += ']';
return sqlMsg;
}
return 'sql: ' + this.sql + ' - parameters:[]';
}
success(val) {
this.packet.waitingResponseNo--;
if (this.packet.haveErrorResponse) {
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
this._columns = null;
this._rows = null;
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit('end', this.firstError);
}
} else {
if (!this.sending && this.packet.waitingResponseNo === 0) {
if (this.parseResults.reWritable) {
this.packet = null;
let totalAffectedRows = 0;
this._rows.forEach((row) => {
totalAffectedRows += row.affectedRows;
});
const rs = {
affectedRows: totalAffectedRows,
insertId: this._rows[0].insertId,
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
return;
} else {
this.successEnd(this._rows);
}
this._columns = null;
this._rows = null;
return;
}
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}
throwError(err, info) {
this.packet.waitingResponseNo--;
this.sending = false;
if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) {
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
err = Errors.createError(
err.message,
err.fatal,
info,
err.sqlState,
err.errno,
this.stack,
false
);
}
this.firstError = err;
this.packet.endedWithError();
}
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit('end', this.firstError);
} else {
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}
/**
* Validate that parameters exists and are defined.
*
* @param info connection info
* @returns {boolean} return false if any error occur.
*/
validateParameters(info) {
//validate parameter size.
for (let r = 0; r < this.values.length; r++) {
let val = this.values[r];
if (!Array.isArray(val)) {
val = [val];
this.values[r] = val;
}
if (this.parseResults.partList.length - 3 > val.length) {
this.emit('send_end');
this.throwNewError(
'Parameter at position ' +
val.length +
' is not set for values ' +
r +
'\n' +
this.displaySql(),
false,
info,
'HY000',
Errors.ER_MISSING_PARAMETER
);
return false;
}
//validate parameter is defined.
for (let i = 0; i < this.parseResults.partList.length - 3; i++) {
if (val[i] === undefined) {
this.emit('send_end');
this.throwNewError(
'Parameter at position ' +
(i + 1) +
' is undefined for values ' +
r +
'\n' +
this.displaySql(),
false,
info,
'HY000',
Errors.ER_PARAMETER_UNDEFINED
);
return false;
}
}
}
return true;
}
/**
* Define params events.
* Each parameter indicate that he is written to socket,
* emitting event so next parameter can be written.
*/
registerStreamSendEvent(packet, info) {
this.paramWritten = function () {
let flushed = false;
while (!flushed) {
if (this.packet.haveErrorResponse) {
this.sending = false;
this.emit('send_end');
return;
}
if (this.currentParam === this.valueRow.length) {
// all parameters from row are written.
flushed =
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]) ||
flushed;
flushed =
packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length) ||
flushed;
if (this.valueIdx < this.values.length) {
// still remaining rows
this.valueRow = this.values[this.valueIdx++];
this.currentParam = 0;
} else {
// all rows are written
this.sending = false;
this.emit('send_end');
return;
}
}
flushed = packet.writeString(this.parseResults.partList[this.currentParam + 1]) || flushed;
const value = this.valueRow[this.currentParam];
if (value === null) {
flushed = packet.writeStringAscii('NULL') || flushed;
this.currentParam++;
continue;
}
if (
typeof value === 'object' &&
typeof value.pipe === 'function' &&
typeof value.read === 'function'
) {
//********************************************
// param is stream,
//********************************************
flushed = packet.writeInt8(QUOTE) || flushed;
value.once(
'end',
function () {
packet.writeInt8(QUOTE);
this.currentParam++;
this.paramWritten();
}.bind(this)
);
value.on('data', function (chunk) {
packet.writeBufferEscape(chunk);
});
return;
}
//********************************************
// param isn't stream. directly write in buffer
//********************************************
flushed = this.writeParam(packet, value, this.opts, info) || flushed;
this.currentParam++;
}
if (this.sending) setImmediate(this.paramWritten.bind(this));
}.bind(this);
}
}
module.exports = BatchRewrite;