Skip to content

Commit cc84799

Browse files
committed
integrate new pool into existing codebase
1 parent bb448fe commit cc84799

File tree

8 files changed

+127
-202
lines changed

8 files changed

+127
-202
lines changed

lib/index.js

+20-56
Original file line numberDiff line numberDiff line change
@@ -2,78 +2,43 @@ var EventEmitter = require('events').EventEmitter;
22
var util = require('util');
33
var Client = require(__dirname+'/client');
44
var defaults = require(__dirname + '/defaults');
5-
6-
//external genericPool module
7-
var genericPool = require('generic-pool');
8-
9-
//cache of existing client pools
10-
var pools = {};
5+
var pool = require(__dirname + '/pool');
6+
var types = require(__dirname + '/types');
7+
var Connection = require(__dirname + '/connection');
118

129
var PG = function(clientConstructor) {
1310
EventEmitter.call(this);
14-
this.Client = clientConstructor;
15-
this.Connection = require(__dirname + '/connection');
16-
this.Query = clientConstructor.Query;
1711
this.defaults = defaults;
12+
this.Client = pool.Client = clientConstructor;
13+
this.Query = this.Client.Query;
14+
this.pools = pool;
15+
this.types = types;
16+
this.Connection = Connection;
1817
};
1918

2019
util.inherits(PG, EventEmitter);
2120

2221
PG.prototype.end = function() {
23-
Object.keys(pools).forEach(function(name) {
24-
var pool = pools[name];
22+
var self = this;
23+
Object.keys(self.pools.all).forEach(function(key) {
24+
var pool = self.pools.all[key];
2525
pool.drain(function() {
2626
pool.destroyAllNow();
2727
});
2828
});
2929
};
3030

3131
PG.prototype.connect = function(config, callback) {
32-
var self = this;
33-
var c = config;
34-
var cb = callback;
35-
//allow for no config to be passed
36-
if(typeof c === 'function') {
37-
cb = c;
38-
c = defaults;
32+
if(typeof config == "function") {
33+
callback = config;
34+
config = null;
35+
}
36+
var pool = this.pools.getOrCreate(config);
37+
pool.connect(callback);
38+
if(!pool.listeners('error').length) {
39+
//propagate errors up to pg object
40+
pool.on('error', this.emit.bind(this, 'error'));
3941
}
40-
41-
//get unique pool name even if object was used as config
42-
var poolName = typeof(c) === 'string' ? c : c.user+c.host+c.port+c.database;
43-
var pool = pools[poolName];
44-
45-
if(pool) { return pool.acquire(cb); }
46-
47-
pool = pools[poolName] = genericPool.Pool({
48-
name: poolName,
49-
create: function(callback) {
50-
var client = new self.Client(c);
51-
client.connect(function(err) {
52-
if(err) { return callback(err); }
53-
54-
//handle connected client background errors by emitting event
55-
//via the pg object and then removing errored client from the pool
56-
client.on('error', function(e) {
57-
self.emit('error', e, client);
58-
pool.destroy(client);
59-
});
60-
61-
callback(null, client);
62-
});
63-
64-
client.on('drain', function() {
65-
pool.release(client);
66-
});
67-
},
68-
destroy: function(client) {
69-
client.end();
70-
},
71-
max: defaults.poolSize,
72-
idleTimeoutMillis: defaults.poolIdleTimeout,
73-
reapIntervalMillis: defaults.reapIntervalMillis,
74-
log: defaults.poolLog
75-
});
76-
return pool.acquire(cb);
7742
};
7843

7944
// cancel the query runned by the given client
@@ -96,4 +61,3 @@ module.exports.__defineGetter__("native", function() {
9661
return module.exports.native;
9762
});
9863

99-
module.exports.types = require('./types');

lib/pool.js

+61-59
Original file line numberDiff line numberDiff line change
@@ -3,56 +3,61 @@ var EventEmitter = require('events').EventEmitter;
33
var defaults = require(__dirname + '/defaults');
44
var genericPool = require('generic-pool');
55

6-
//takes the same config structure as client
7-
var createPool = function(clientConfig) {
8-
clientConfig = clientConfig || {};
9-
var name = JSON.stringify(clientConfig);
10-
var pool = createPool.all[name];
11-
if(pool) {
12-
return pool;
13-
}
14-
pool = genericPool.Pool({
15-
name: name,
16-
max: defaults.poolSize,
17-
idleTimeoutMillis: defaults.poolIdleTimeout,
18-
reapIntervalMillis: defaults.reapIntervalMillis,
19-
log: defaults.poolLog,
20-
create: function(cb) {
21-
var client = new createPool.Client(clientConfig);
22-
client.connect(function(err) {
23-
if(err) return cb(err, null);
6+
var pools = {
7+
//dictionary of all key:pool pairs
8+
all: {},
9+
//reference to the client constructor - can override in tests or for require('pg').native
10+
Client: require(__dirname + '/client'),
11+
getOrCreate: function(clientConfig) {
12+
clientConfig = clientConfig || {};
13+
var name = JSON.stringify(clientConfig);
14+
var pool = pools.all[name];
15+
if(pool) {
16+
return pool;
17+
}
18+
pool = genericPool.Pool({
19+
name: name,
20+
max: defaults.poolSize,
21+
idleTimeoutMillis: defaults.poolIdleTimeout,
22+
reapIntervalMillis: defaults.reapIntervalMillis,
23+
log: defaults.poolLog,
24+
create: function(cb) {
25+
var client = new pools.Client(clientConfig);
26+
client.connect(function(err) {
27+
if(err) return cb(err, null);
2428

25-
//handle connected client background errors by emitting event
26-
//via the pg object and then removing errored client from the pool
27-
client.on('error', function(e) {
28-
pool.emit('error', e, client);
29-
pool.destroy(client);
30-
});
29+
//handle connected client background errors by emitting event
30+
//via the pg object and then removing errored client from the pool
31+
client.on('error', function(e) {
32+
pool.emit('error', e, client);
33+
pool.destroy(client);
34+
});
3135

32-
return cb(null, client);
33-
});
34-
},
35-
destroy: function(client) {
36-
client.end();
37-
}
38-
});
39-
createPool.all[name] = pool;
40-
//mixin EventEmitter to pool
41-
EventEmitter.call(pool);
42-
for(var key in EventEmitter.prototype) {
43-
if(EventEmitter.prototype.hasOwnProperty(key)) {
44-
pool[key] = EventEmitter.prototype[key];
36+
return cb(null, client);
37+
});
38+
},
39+
destroy: function(client) {
40+
client.end();
41+
}
42+
});
43+
pools.all[name] = pool;
44+
//mixin EventEmitter to pool
45+
EventEmitter.call(pool);
46+
for(var key in EventEmitter.prototype) {
47+
if(EventEmitter.prototype.hasOwnProperty(key)) {
48+
pool[key] = EventEmitter.prototype[key];
49+
}
4550
}
51+
//monkey-patch with connect method
52+
pool.connect = function(cb) {
53+
pool.acquire(function(err, client) {
54+
if(err) return cb(err, null, function() {/*NOOP*/});
55+
//support both 2 (old) and 3 arguments
56+
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
57+
});
58+
};
59+
return pool;
4660
}
47-
//monkey-patch with connect method
48-
pool.connect = function(cb) {
49-
pool.acquire(function(err, client) {
50-
if(err) return cb(err, null, function() {/*NOOP*/});
51-
//support both 2 (old) and 3 arguments
52-
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
53-
});
54-
};
55-
return pool;
5661
};
5762

5863
//the old connect method of the pool
@@ -62,12 +67,15 @@ var createPool = function(clientConfig) {
6267
//a bunch of problems, but for backwards compatibility
6368
//we're leaving it in
6469
var alarmDuration = 5000;
65-
var errorMessage = ['A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
66-
'You might have a leak!',
67-
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
68-
' //do something',
69-
' done(); //call done() to signal you are finished with the client',
70-
'}'].join(require('os').EOL);
70+
var errorMessage = [
71+
'A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
72+
'You might have a leak!',
73+
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
74+
' //do something',
75+
' done(); //call done() to signal you are finished with the client',
76+
'}'
77+
].join(require('os').EOL);
78+
7179
var oldConnect = function(pool, client, cb) {
7280
var tid = setTimeout(function() {
7381
console.error(errorMessage);
@@ -90,10 +98,4 @@ var newConnect = function(pool, client, cb) {
9098
});
9199
};
92100

93-
//list of all created pools
94-
createPool.all = {};
95-
96-
//reference to client constructor
97-
createPool.Client = require(__dirname + '/client');
98-
99-
module.exports = createPool;
101+
module.exports = pools;

test/integration/connection-pool/error-tests.js

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ helper.pg.connect(helper.config, assert.success(function(client) {
1717
assert.ok(error);
1818
assert.ok(brokenClient);
1919
assert.equal(client.id, brokenClient.id);
20+
client.emit('drain');
2021
helper.pg.end();
2122
});
2223
//kill the connection from client

test/integration/connection-pool/optional-config-tests.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,11 @@ helper.pg.defaults.poolSize = 1;
1010

1111
helper.pg.connect(assert.calls(function(err, client) {
1212
assert.isNull(err);
13-
client.end();
13+
client.query('SELECT NOW()');
14+
client.once('drain', function() {
15+
setTimeout(function() {
16+
helper.pg.end();
17+
18+
}, 10);
19+
});
1420
}));

test/integration/connection-pool/test-helper.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ helper.testPoolSize = function(max) {
99
for(var i = 0; i < max; i++) {
1010
helper.pg.poolSize = 10;
1111
test("connection #" + i + " executes", function() {
12-
helper.pg.connect(helper.config, function(err, client) {
12+
helper.pg.connect(helper.config, function(err, client, done) {
1313
assert.isNull(err);
1414
client.query("select * from person", function(err, result) {
1515
assert.lengthIs(result.rows, 26)
@@ -19,7 +19,8 @@ helper.testPoolSize = function(max) {
1919
})
2020
var query = client.query("SELECT * FROM NOW()")
2121
query.on('end',function() {
22-
sink.add()
22+
sink.add();
23+
done();
2324
})
2425
})
2526
})

test/integration/connection-pool/unique-name-tests.js

-63
This file was deleted.

0 commit comments

Comments
 (0)