371 lines
11 KiB
JavaScript
371 lines
11 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
const CommonText = require('./common-text-cmd');
|
||
|
const Errors = require('../misc/errors');
|
||
|
const Parse = require('../misc/parse');
|
||
|
const RewritePacket = require('../io/rewrite-packet');
|
||
|
const QUOTE = 0x27;
|
||
|
|
||
|
/**
|
||
|
* Protocol COM_QUERY
|
||
|
* see : https://mariadb.com/kb/en/library/com_query/
|
||
|
*/
|
||
|
class BatchRewrite extends CommonText {
|
||
|
constructor(resolve, reject, options, connOpts, sql, values) {
|
||
|
super(resolve, reject, options, connOpts, sql, values);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Send COM_QUERY
|
||
|
*
|
||
|
* @param out output writer
|
||
|
* @param opts connection options
|
||
|
* @param info connection information
|
||
|
*/
|
||
|
start(out, opts, info) {
|
||
|
this.sending = true;
|
||
|
this.info = info;
|
||
|
if (this.opts.timeout) {
|
||
|
const err = Errors.createError(
|
||
|
'Cannot use timeout for Batch statement',
|
||
|
false,
|
||
|
info,
|
||
|
'HY000',
|
||
|
Errors.ER_TIMEOUT_NOT_SUPPORTED
|
||
|
);
|
||
|
this.emit('send_end');
|
||
|
this.throwError(err, info);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (this.initialValues.length === 0) this.initialValues = [[]];
|
||
|
if (this.opts.namedPlaceholders) {
|
||
|
this.parseResults = Parse.splitRewritableNamedParameterQuery(this.sql, this.initialValues);
|
||
|
this.values = this.parseResults.values;
|
||
|
} else {
|
||
|
this.parseResults = Parse.splitRewritableQuery(this.sql);
|
||
|
this.values = this.initialValues;
|
||
|
if (!this.validateParameters(info)) {
|
||
|
this.sending = false;
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
out.startPacket(this);
|
||
|
this.packet = new RewritePacket(
|
||
|
this.opts.maxAllowedPacket,
|
||
|
out,
|
||
|
this.parseResults.partList[0],
|
||
|
this.parseResults.partList[this.parseResults.partList.length - 1]
|
||
|
);
|
||
|
|
||
|
this.onPacketReceive = this.readResponsePacket;
|
||
|
this.valueIdx = 0;
|
||
|
this.sendQueries();
|
||
|
}
|
||
|
|
||
|
sendQueries() {
|
||
|
let flushed = false;
|
||
|
while (!flushed && this.sending && this.valueIdx < this.values.length) {
|
||
|
this.valueRow = this.values[this.valueIdx++];
|
||
|
|
||
|
//********************************************
|
||
|
// send params
|
||
|
//********************************************
|
||
|
const len = this.parseResults.partList.length - 3;
|
||
|
for (let i = 0; i < len; i++) {
|
||
|
const value = this.valueRow[i];
|
||
|
flushed = this.packet.writeString(this.parseResults.partList[i + 1]) || flushed;
|
||
|
if (value === null) {
|
||
|
flushed = this.packet.writeStringAscii('NULL') || flushed;
|
||
|
continue;
|
||
|
}
|
||
|
|
||
|
if (
|
||
|
typeof value === 'object' &&
|
||
|
typeof value.pipe === 'function' &&
|
||
|
typeof value.read === 'function'
|
||
|
) {
|
||
|
//********************************************
|
||
|
// param is stream,
|
||
|
// now all params will be written by event
|
||
|
//********************************************
|
||
|
this.registerStreamSendEvent(this.packet, this.info);
|
||
|
this.currentParam = i;
|
||
|
this.packet.writeInt8(QUOTE); //'
|
||
|
|
||
|
value.on(
|
||
|
'data',
|
||
|
function (chunk) {
|
||
|
this.packet.writeBufferEscape(chunk);
|
||
|
}.bind(this)
|
||
|
);
|
||
|
|
||
|
value.on(
|
||
|
'end',
|
||
|
function () {
|
||
|
this.packet.writeInt8(QUOTE); //'
|
||
|
this.currentParam++;
|
||
|
this.paramWritten();
|
||
|
}.bind(this)
|
||
|
);
|
||
|
|
||
|
return;
|
||
|
} else {
|
||
|
//********************************************
|
||
|
// param isn't stream. directly write in buffer
|
||
|
//********************************************
|
||
|
flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
|
||
|
}
|
||
|
}
|
||
|
this.packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
|
||
|
this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
|
||
|
}
|
||
|
|
||
|
if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
|
||
|
//there is still data to send
|
||
|
setImmediate(this.sendQueries.bind(this));
|
||
|
} else {
|
||
|
if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
|
||
|
this.sending = false;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
displaySql() {
|
||
|
if (this.opts && this.initialValues) {
|
||
|
if (this.sql.length > this.opts.debugLen) {
|
||
|
return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
|
||
|
}
|
||
|
|
||
|
let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
|
||
|
sqlMsg += '[';
|
||
|
for (let i = 0; i < this.initialValues.length; i++) {
|
||
|
if (i !== 0) sqlMsg += ',';
|
||
|
let param = this.initialValues[i];
|
||
|
sqlMsg = this.logParameters(sqlMsg, param);
|
||
|
if (sqlMsg.length > this.opts.debugLen) {
|
||
|
sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
sqlMsg += ']';
|
||
|
return sqlMsg;
|
||
|
}
|
||
|
return 'sql: ' + this.sql + ' - parameters:[]';
|
||
|
}
|
||
|
|
||
|
success(val) {
|
||
|
this.packet.waitingResponseNo--;
|
||
|
|
||
|
if (this.packet.haveErrorResponse) {
|
||
|
if (!this.sending && this.packet.waitingResponseNo === 0) {
|
||
|
this.packet = null;
|
||
|
this.onPacketReceive = null;
|
||
|
this.resolve = null;
|
||
|
this._columns = null;
|
||
|
this._rows = null;
|
||
|
process.nextTick(this.reject, this.firstError);
|
||
|
this.reject = null;
|
||
|
this.emit('end', this.firstError);
|
||
|
}
|
||
|
} else {
|
||
|
if (!this.sending && this.packet.waitingResponseNo === 0) {
|
||
|
if (this.parseResults.reWritable) {
|
||
|
this.packet = null;
|
||
|
let totalAffectedRows = 0;
|
||
|
this._rows.forEach((row) => {
|
||
|
totalAffectedRows += row.affectedRows;
|
||
|
});
|
||
|
|
||
|
const rs = {
|
||
|
affectedRows: totalAffectedRows,
|
||
|
insertId: this._rows[0].insertId,
|
||
|
warningStatus: this._rows[this._rows.length - 1].warningStatus
|
||
|
};
|
||
|
this.successEnd(rs);
|
||
|
return;
|
||
|
} else {
|
||
|
this.successEnd(this._rows);
|
||
|
}
|
||
|
this._columns = null;
|
||
|
this._rows = null;
|
||
|
return;
|
||
|
}
|
||
|
this._responseIndex++;
|
||
|
this.onPacketReceive = this.readResponsePacket;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
throwError(err, info) {
|
||
|
this.packet.waitingResponseNo--;
|
||
|
this.sending = false;
|
||
|
|
||
|
if (this.packet && !this.packet.haveErrorResponse) {
|
||
|
if (err.fatal) {
|
||
|
this.packet.waitingResponseNo = 0;
|
||
|
}
|
||
|
if (this.stack) {
|
||
|
err = Errors.createError(
|
||
|
err.message,
|
||
|
err.fatal,
|
||
|
info,
|
||
|
err.sqlState,
|
||
|
err.errno,
|
||
|
this.stack,
|
||
|
false
|
||
|
);
|
||
|
}
|
||
|
this.firstError = err;
|
||
|
this.packet.endedWithError();
|
||
|
}
|
||
|
|
||
|
if (!this.sending && this.packet.waitingResponseNo === 0) {
|
||
|
this.packet = null;
|
||
|
this.onPacketReceive = null;
|
||
|
this.resolve = null;
|
||
|
process.nextTick(this.reject, this.firstError);
|
||
|
this.reject = null;
|
||
|
this.emit('end', this.firstError);
|
||
|
} else {
|
||
|
this._responseIndex++;
|
||
|
this.onPacketReceive = this.readResponsePacket;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Validate that parameters exists and are defined.
|
||
|
*
|
||
|
* @param info connection info
|
||
|
* @returns {boolean} return false if any error occur.
|
||
|
*/
|
||
|
validateParameters(info) {
|
||
|
//validate parameter size.
|
||
|
for (let r = 0; r < this.values.length; r++) {
|
||
|
let val = this.values[r];
|
||
|
if (!Array.isArray(val)) {
|
||
|
val = [val];
|
||
|
this.values[r] = val;
|
||
|
}
|
||
|
|
||
|
if (this.parseResults.partList.length - 3 > val.length) {
|
||
|
this.emit('send_end');
|
||
|
this.throwNewError(
|
||
|
'Parameter at position ' +
|
||
|
val.length +
|
||
|
' is not set for values ' +
|
||
|
r +
|
||
|
'\n' +
|
||
|
this.displaySql(),
|
||
|
false,
|
||
|
info,
|
||
|
'HY000',
|
||
|
Errors.ER_MISSING_PARAMETER
|
||
|
);
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
//validate parameter is defined.
|
||
|
for (let i = 0; i < this.parseResults.partList.length - 3; i++) {
|
||
|
if (val[i] === undefined) {
|
||
|
this.emit('send_end');
|
||
|
this.throwNewError(
|
||
|
'Parameter at position ' +
|
||
|
(i + 1) +
|
||
|
' is undefined for values ' +
|
||
|
r +
|
||
|
'\n' +
|
||
|
this.displaySql(),
|
||
|
false,
|
||
|
info,
|
||
|
'HY000',
|
||
|
Errors.ER_PARAMETER_UNDEFINED
|
||
|
);
|
||
|
return false;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Define params events.
|
||
|
* Each parameter indicate that he is written to socket,
|
||
|
* emitting event so next parameter can be written.
|
||
|
*/
|
||
|
registerStreamSendEvent(packet, info) {
|
||
|
this.paramWritten = function () {
|
||
|
let flushed = false;
|
||
|
while (!flushed) {
|
||
|
if (this.packet.haveErrorResponse) {
|
||
|
this.sending = false;
|
||
|
this.emit('send_end');
|
||
|
return;
|
||
|
}
|
||
|
if (this.currentParam === this.valueRow.length) {
|
||
|
// all parameters from row are written.
|
||
|
flushed =
|
||
|
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]) ||
|
||
|
flushed;
|
||
|
flushed =
|
||
|
packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length) ||
|
||
|
flushed;
|
||
|
if (this.valueIdx < this.values.length) {
|
||
|
// still remaining rows
|
||
|
this.valueRow = this.values[this.valueIdx++];
|
||
|
this.currentParam = 0;
|
||
|
} else {
|
||
|
// all rows are written
|
||
|
this.sending = false;
|
||
|
this.emit('send_end');
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
flushed = packet.writeString(this.parseResults.partList[this.currentParam + 1]) || flushed;
|
||
|
const value = this.valueRow[this.currentParam];
|
||
|
|
||
|
if (value === null) {
|
||
|
flushed = packet.writeStringAscii('NULL') || flushed;
|
||
|
this.currentParam++;
|
||
|
continue;
|
||
|
}
|
||
|
|
||
|
if (
|
||
|
typeof value === 'object' &&
|
||
|
typeof value.pipe === 'function' &&
|
||
|
typeof value.read === 'function'
|
||
|
) {
|
||
|
//********************************************
|
||
|
// param is stream,
|
||
|
//********************************************
|
||
|
flushed = packet.writeInt8(QUOTE) || flushed;
|
||
|
value.once(
|
||
|
'end',
|
||
|
function () {
|
||
|
packet.writeInt8(QUOTE);
|
||
|
this.currentParam++;
|
||
|
this.paramWritten();
|
||
|
}.bind(this)
|
||
|
);
|
||
|
|
||
|
value.on('data', function (chunk) {
|
||
|
packet.writeBufferEscape(chunk);
|
||
|
});
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
//********************************************
|
||
|
// param isn't stream. directly write in buffer
|
||
|
//********************************************
|
||
|
flushed = this.writeParam(packet, value, this.opts, info) || flushed;
|
||
|
this.currentParam++;
|
||
|
}
|
||
|
|
||
|
if (this.sending) setImmediate(this.paramWritten.bind(this));
|
||
|
}.bind(this);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = BatchRewrite;
|