DiscofyAPI/node_modules/mariadb/lib/io/compression-input-stream.js

142 lines
4.2 KiB
JavaScript

'use strict';
const ZLib = require('zlib');
const Utils = require('../misc/utils');
/**
* MySQL packet parser
* see : https://mariadb.com/kb/en/library/0-packet/
*/
class CompressionInputStream {
constructor(reader, receiveQueue, opts, info) {
this.reader = reader;
this.receiveQueue = receiveQueue;
this.info = info;
this.opts = opts;
this.header = Buffer.allocUnsafe(7);
this.headerLen = 0;
this.compressPacketLen = null;
this.packetLen = null;
this.remainingLen = null;
this.parts = null;
this.partsTotalLen = 0;
}
receivePacket(chunk) {
let cmd = this.currentCmd();
if (this.opts.debugCompress) {
console.log(
'<== conn:%d %s (compress)\n%s',
this.info.threadId ? this.info.threadId : -1,
cmd
? cmd.onPacketReceive
? cmd.constructor.name + '.' + cmd.onPacketReceive.name
: cmd.constructor.name
: 'no command',
Utils.log(this.opts, chunk, 0, chunk.length, this.header)
);
}
if (cmd) cmd.compressSequenceNo = this.header[3];
const unCompressLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
if (unCompressLen === 0) {
this.reader.onData(chunk);
} else {
//use synchronous inflating, to ensure FIFO packet order
const unCompressChunk = ZLib.inflateSync(chunk);
this.reader.onData(unCompressChunk);
}
}
currentCmd() {
let cmd;
while ((cmd = this.receiveQueue.peek())) {
if (cmd.onPacketReceive) return cmd;
this.receiveQueue.shift();
}
return null;
}
resetHeader() {
this.remainingLen = null;
this.headerLen = 0;
}
onData(chunk) {
let pos = 0;
let length;
const chunkLen = chunk.length;
do {
if (this.remainingLen) {
length = this.remainingLen;
} else if (this.headerLen === 0 && chunkLen - pos >= 7) {
this.header[0] = chunk[pos];
this.header[1] = chunk[pos + 1];
this.header[2] = chunk[pos + 2];
this.header[3] = chunk[pos + 3];
this.header[4] = chunk[pos + 4];
this.header[5] = chunk[pos + 5];
this.header[6] = chunk[pos + 6];
this.headerLen = 7;
pos += 7;
this.compressPacketLen = this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
length = this.compressPacketLen;
} else {
length = null;
while (chunkLen - pos > 0) {
this.header[this.headerLen++] = chunk[pos++];
if (this.headerLen === 7) {
this.compressPacketLen =
this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
length = this.compressPacketLen;
break;
}
}
}
if (length) {
if (chunkLen - pos >= length) {
const buf = chunk.slice(pos, pos + length);
pos += length;
if (this.parts) {
this.parts.push(buf);
this.partsTotalLen += length;
if (this.compressPacketLen < 0xffffff) {
let buf = Buffer.concat(this.parts, this.partsTotalLen);
this.parts = null;
this.receivePacket(buf);
}
} else {
if (this.compressPacketLen < 0xffffff) {
this.receivePacket(buf);
} else {
this.parts = [buf];
this.partsTotalLen = length;
}
}
this.resetHeader();
} else {
const buf = chunk.slice(pos, chunkLen);
if (!this.parts) {
this.parts = [buf];
this.partsTotalLen = chunkLen - pos;
} else {
this.parts.push(buf);
this.partsTotalLen += chunkLen - pos;
}
this.remainingLen = length - (chunkLen - pos);
return;
}
}
} while (pos < chunkLen);
}
}
module.exports = CompressionInputStream;