diff --git a/lib/byte_queue.js b/lib/byte_queue.js new file mode 100644 index 00000000..d71d9e31 --- /dev/null +++ b/lib/byte_queue.js @@ -0,0 +1,111 @@ +// Efficient scatter-gather byte-stream queue +// Copyright (C) 2020 Tirotech Ltd +// +// Author: Daniel Beer +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +function fill(out, arr, j) { + while (out.length > 0 && j < arr.length) { + let b = arr[j]; + + b.copy(out); + + if (b.length <= out.length) { + out = out.slice(b.length); + j++; + } else { + out = out.slice(out.length); + } + } + + return out; +} + +function ByteQueue() { + this.front = []; + this.back = []; + this.i = 0; + this.len = 0; + + return this; +} + +ByteQueue.prototype = { + discard: function(n) { + while (this.i < this.front.length && + this.front[this.i].length <= n) { + n -= this.front[this.i].length; + this.len -= this.front[this.i].length; + this.i++; + } + + if (this.i >= this.front.length) { + this.front = this.back; + this.back = []; + this.i = 0; + } + + while (this.i < this.front.length && + this.front[this.i].length <= n) { + n -= this.front[this.i].length; + this.len -= this.front[this.i].length; + this.i++; + } + + if (n > 0) { + if (this.i >= this.front.length || + this.front[this.i].length < n) + throw "ByteQueue underrun"; + + this.front[this.i] = this.front[this.i].slice(n); + this.len -= n; + } + }, + + size: function() { + return this.len; + }, + + push: function(x) { + this.back.push(x); + this.len += x.length; + }, + + peekTo: function(buf) { + buf = fill(buf, this.front, this.i); + buf = fill(buf, this.back, 0); + + if (buf.length > 0) + throw "ByteQueue underrun"; + }, + + peek: function(n) { + const buf = Buffer.alloc(n); + this.peekTo(buf); + return buf; + }, + + shiftTo: function(buf) { + this.peekTo(buf); + this.discard(buf.length); + }, + + shift: function(n) { + const buf = this.peek(n) + this.discard(n) + return buf + } +} + +module.exports = { ByteQueue }; diff --git a/lib/connection.js b/lib/connection.js index 0b9c727f..738961cc 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -5,6 +5,7 @@ const Backend = require('./backend.js') const Queue = require('./queue.js') const { END } = require('./types.js') const { errors } = require('./errors.js') +const { ByteQueue } = require('./byte_queue.js'); module.exports = Connection @@ -21,7 +22,7 @@ function Connection(options = {}) { onclose, parsers } = options - let buffer = Buffer.alloc(0) + let buffer = new ByteQueue(); let length = 0 let messages = [] let timer @@ -255,17 +256,16 @@ function Connection(options = {}) { } function data(x) { - buffer = buffer.length === 0 - ? x - : Buffer.concat([buffer, x], buffer.length + x.length) + buffer.push(x); - while (buffer.length > 4) { - length = buffer.readInt32BE(1) - if (length >= buffer.length) - break + while (buffer.size() > 4) { + const header = buffer.peek(5); - backend[buffer[0]](buffer.slice(0, length + 1)) - buffer = buffer.slice(length + 1) + length = header.readInt32BE(1) + if (length >= buffer.size()) + break + + backend[header[0]](buffer.shift(length + 1)) } }