diff --git a/lib/adapter.js b/lib/adapter.js index 8690191a..abe1a2f0 100644 --- a/lib/adapter.js +++ b/lib/adapter.js @@ -907,20 +907,31 @@ module.exports = (function() { var schema = connectionObject.schema; var _query; - var sequel = new Sequel(schema, sqlOptions); + var query; + var data; + if (stream.customQuery) { + query = stream.customQuery.query; + data = stream.customQuery.params; + } else { + var sequel = new Sequel(schema, sqlOptions); - // Build a query for the specific query strategy - try { - _query = sequel.find(collectionName, options); - } catch(e) { - return cb(e); + // Build a query for the specific query strategy + try { + _query = sequel.find(collectionName, options); + } catch(e) { + return cb(e); + } + query = _query.query[0]; } - var query = _query.query[0]; + // Run query log('MySQL.stream: ', query); - var dbStream = connection.query(query); + var dbStream = (function() { + if (data) return connection.query(query, data); + else return connection.query(query); + })(); // Handle error, an 'end' event will be emitted after this as well dbStream.on('error', function(err) { @@ -931,15 +942,24 @@ module.exports = (function() { // the field packets for the rows to follow dbStream.on('fields', function(fields) {}); + var manualResume = stream.resumeConnectionManually === true; + // Pausing the connnection is useful if your processing involves I/O - dbStream.on('result', function(row) { - connection.pause(); - stream.write(row, function() { - setImmediate(function() { - connection.resume(); + dbStream.on('result', !manualResume + ? function dbStreamResultAutoResume(row) { + connection.pause(); + stream.write(row, function () { + setImmediate(function () { + connection.resume(); + }); + }) + } + : function dbStreamResultManualResume(row) { + connection.pause(); + stream.write(row, function () { }); - }); - }); + } + ); var dbStreamFirstEnd = false; // all rows have been received @@ -950,13 +970,17 @@ module.exports = (function() { }); stream.on('end', function() { - console.log('end emitted from stream') + //console.log('end emitted from stream') if(!dbStreamFirstEnd) { connection.destroy(); dbStream.end(); } }) + stream.resumeConnection = function(){ + connection.resume(); + } + //stream.on('unpipe', function() { console.log('unpipe emitted from readStream') }) //stream.on('close', function() { console.log('close emitted from readStream') }) //stream.on('destroy', function() { console.log('destroy emitted from readStream') }) diff --git a/package.json b/package.json index 632df688..e6b90250 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "sails-mysql", - "version": "0.11.1c", + "version": "0.11.1e", "description": "MySQL adapter for Sails.js", "main": "lib/adapter.js", "scripts": { @@ -8,7 +8,7 @@ }, "repository": { "type": "git", - "url": "git://github.com/balderdashy/sails-mysql.git" + "url": "git://github.com/github1337/sails-mysql.git" }, "keywords": [ "mysql",