'use strict'; const CommonBinary = require('./common-binary-cmd'); const Errors = require('../misc/errors'); const Parse = require('../misc/parse'); const BulkPacket = require('../io/bulk-packet'); /** * Protocol COM_STMT_BULK_EXECUTE * see : https://mariadb.com/kb/en/library/com_stmt_bulk_execute/ */ class BatchBulk extends CommonBinary { constructor(resolve, reject, options, connOpts, sql, values) { super(resolve, reject, options, connOpts, sql, values); this.onPacketReceive = this.readPrepareResultPacket; } /** * Send COM_STMT_BULK_EXECUTE * * @param out output writer * @param opts connection options * @param info connection information */ start(out, opts, info) { this.sending = true; this.info = info; this.values = this.initialValues; if (this.opts.timeout) { const err = Errors.createError( 'Cannot use timeout for Batch statement', false, info, 'HY000', Errors.ER_TIMEOUT_NOT_SUPPORTED ); this.emit('send_end'); this.throwError(err, info); return; } let questionMarkSql = this.sql; if (this.opts.namedPlaceholders) { const res = Parse.searchPlaceholder( this.sql, info, this.initialValues, this.displaySql.bind(this) ); questionMarkSql = res.sql; this.values = res.values; } if (!this.validateParameters(info)) { this.sending = false; return; } //send COM_STMT_PREPARE command this.out = out; this.packet = new BulkPacket(this.opts, out, this.values[0]); out.startPacket(this); out.writeInt8(0x16); out.writeString(questionMarkSql); out.flushBuffer(true); if (this.opts.pipelining) { out.startPacket(this); this.valueIdx = 0; this.sendQueries(); } else { this.out = out; } } sendQueries() { let flushed = false; while (!flushed && this.sending && this.valueIdx < this.values.length) { this.valueRow = this.values[this.valueIdx++]; //******************************************** // send params //******************************************** const len = this.valueRow.length; for (let i = 0; i < len; i++) { const value = this.valueRow[i]; if (value === null) { flushed = this.packet.writeInt8(0x01) || flushed; continue; } //******************************************** // param has no stream. directly write in buffer //******************************************** flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed; } const last = this.valueIdx === this.values.length; flushed = this.packet.mark(last, last ? null : this.values[this.valueIdx]) || flushed; } if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) { //there is still data to send setImmediate(this.sendQueries.bind(this)); } else { if (this.sending && this.valueIdx === this.values.length) this.emit('send_end'); this.sending = false; } } displaySql() { if (this.opts && this.initialValues) { if (this.sql.length > this.opts.debugLen) { return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...'; } let sqlMsg = 'sql: ' + this.sql + ' - parameters:'; sqlMsg += '['; for (let i = 0; i < this.initialValues.length; i++) { if (i !== 0) sqlMsg += ','; let param = this.initialValues[i]; sqlMsg = this.logParameters(sqlMsg, param); if (sqlMsg.length > this.opts.debugLen) { sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...'; break; } } sqlMsg += ']'; return sqlMsg; } return 'sql: ' + this.sql + ' - parameters:[]'; } success(val) { this.packet.waitingResponseNo--; if (!this.opts.pipelining && this.packet.statementId === -1) { this.packet.statementId = this.statementId; this.out.startPacket(this); this.valueIdx = 0; this.sendQueries(); this._responseIndex++; this.onPacketReceive = this.readResponsePacket; return; } if (!this.sending && this.packet.waitingResponseNo === 0) { //send COM_STMT_CLOSE packet if (!this.firstError || !this.firstError.fatal) { this.sequenceNo = -1; this.compressSequenceNo = -1; this.out.startPacket(this); this.out.writeInt8(0x19); this.out.writeInt32(this.statementId); this.out.flushBuffer(true); } this.sending = false; this.emit('send_end'); if (this.packet.haveErrorResponse) { this.packet = null; this.resolve = null; this.onPacketReceive = null; this._columns = null; this._rows = null; process.nextTick(this.reject, this.firstError); this.reject = null; this.emit('end', this.firstError); } else { this.packet = null; let totalAffectedRows = 0; this._rows.forEach((row) => { totalAffectedRows += row.affectedRows; }); const rs = { affectedRows: totalAffectedRows, insertId: this._rows[0].insertId, warningStatus: this._rows[this._rows.length - 1].warningStatus }; this.successEnd(rs); this._columns = null; this._rows = null; } return; } if (!this.packet.haveErrorResponse) { this._responseIndex++; this.onPacketReceive = this.readResponsePacket; } } throwError(err, info) { this.packet.waitingResponseNo--; this.sending = false; if (this.packet && !this.packet.haveErrorResponse) { if (err.fatal) { this.packet.waitingResponseNo = 0; } if (this.stack) { err = Errors.createError( err.message, err.fatal, info, err.sqlState, err.errno, this.stack, false ); } this.firstError = err; this.packet.endedWithError(); } if (!this.sending && this.packet.waitingResponseNo === 0) { this.resolve = null; //send COM_STMT_CLOSE packet if (!err.fatal && this.statementId) { this.sequenceNo = -1; this.compressSequenceNo = -1; this.out.startPacket(this); this.out.writeInt8(0x19); this.out.writeInt32(this.statementId); this.out.flushBuffer(true); } this.emit('send_end'); process.nextTick(this.reject, this.firstError); this.reject = null; this.onPacketReceive = null; this.emit('end', this.firstError); } else { this._responseIndex++; this.onPacketReceive = this.readResponsePacket; } } /** * Validate that parameters exists and are defined. * * @param info connection info * @returns {boolean} return false if any error occur. */ validateParameters(info) { //validate parameter size. for (let r = 0; r < this.values.length; r++) { if (!Array.isArray(this.values[r])) this.values[r] = [this.values[r]]; //validate parameter is defined. for (let i = 0; i < this.values[r].length; i++) { if (this.values[r][i] === undefined) { this.emit('send_end'); this.throwNewError( 'Parameter at position ' + (i + 1) + ' is undefined for values ' + r + '\n' + this.displaySql(), false, info, 'HY000', Errors.ER_PARAMETER_UNDEFINED ); return false; } } } return true; } } module.exports = BatchBulk;