Skip to content

Commit ec1c70c

Browse files
committed
ability to pause/resume drain event for long-running async transactions
1 parent 45a5142 commit ec1c70c

File tree

4 files changed

+177
-38
lines changed

4 files changed

+177
-38
lines changed

lib/client.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ p._pulseQueryQueue = function() {
129129
this.activeQuery.submit(this.connection);
130130
} else if(this.hasExecuted) {
131131
this.activeQuery = null;
132-
this.emit('drain')
132+
this._drainPaused > 0 ? this._drainPaused++ : this.emit('drain')
133133
}
134134
}
135135
};
@@ -154,6 +154,19 @@ p.query = function(config, values, callback) {
154154
return query;
155155
};
156156

157+
//prevents client from otherwise emitting 'drain' event until 'resumeDrain' is called
158+
p.pauseDrain = function() {
159+
this._drainPaused = 1;
160+
};
161+
162+
//resume raising 'drain' event
163+
p.resumeDrain = function() {
164+
if(this._drainPaused > 1) {
165+
this.emit('drain');
166+
}
167+
this._drainPaused = 0;
168+
};
169+
157170
p.end = function() {
158171
this.connection.end();
159172
};

lib/native/index.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ p._pulseQueryQueue = function(initialConnection) {
3737
var query = this._queryQueue.shift();
3838
if(!query) {
3939
if(!initialConnection) {
40-
this.emit('drain');
40+
this._drainPaused ? this._drainPaused++ : this.emit('drain');
4141
}
4242
return;
4343
}
@@ -60,6 +60,17 @@ p._pulseQueryQueue = function(initialConnection) {
6060
}
6161
}
6262

63+
p.pauseDrain = function() {
64+
this._drainPaused = 1;
65+
};
66+
67+
p.resumeDrain = function() {
68+
if(this._drainPaused > 1) {
69+
this.emit('drain')
70+
};
71+
this._drainPaused = 0;
72+
};
73+
6374
var clientBuilder = function(config) {
6475
config = config || {};
6576
var connection = new Connection();
+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
var helper = require(__dirname + '/test-helper');
2+
var pg = require(__dirname + '/../../../lib');
3+
4+
if(helper.args.native) {
5+
pg = require(__dirname + '/../../../lib').native;
6+
}
7+
8+
var testDrainOfClientWithPendingQueries = function() {
9+
pg.connect(helper.connectionString(), assert.success(function(client) {
10+
test('when there are pending queries and client is resumed', function() {
11+
var drainCount = 0;
12+
client.on('drain', function() {
13+
drainCount++;
14+
});
15+
client.pauseDrain();
16+
client.query('SELECT NOW()', function() {
17+
client.query('SELECT NOW()', function() {
18+
assert.equal(drainCount, 0);
19+
process.nextTick(function() {
20+
assert.equal(drainCount, 1);
21+
pg.end();
22+
});
23+
});
24+
client.resumeDrain();
25+
assert.equal(drainCount, 0);
26+
});
27+
});
28+
}));
29+
};
30+
31+
pg.connect(helper.connectionString(), assert.success(function(client) {
32+
var drainCount = 0;
33+
client.on('drain', function() {
34+
drainCount++;
35+
});
36+
test('pauseDrain and resumeDrain on simple client', function() {
37+
client.pauseDrain();
38+
client.resumeDrain();
39+
process.nextTick(assert.calls(function() {
40+
assert.equal(drainCount, 0);
41+
test('drain is paused', function() {
42+
client.pauseDrain();
43+
client.query('SELECT NOW()', assert.success(function() {
44+
process.nextTick(function() {
45+
assert.equal(drainCount, 0);
46+
client.resumeDrain();
47+
assert.equal(drainCount, 1);
48+
testDrainOfClientWithPendingQueries();
49+
});
50+
}));
51+
});
52+
}));
53+
});
54+
}));
55+

test/unit/client/query-queue-tests.js

+96-36
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,112 @@
11
var helper = require(__dirname + '/test-helper');
22
var Connection = require(__dirname + '/../../../lib/connection');
3-
var con = new Connection({stream: "NO"});
4-
var client = new Client({connection:con});
53

4+
test('drain', function() {
5+
var con = new Connection({stream: "NO"});
6+
var client = new Client({connection:con});
7+
con.connect = function() {
8+
con.emit('connect');
9+
};
10+
con.query = function() {
11+
};
12+
client.connect();
613

7-
con.connect = function() {
8-
con.emit('connect');
9-
};
10-
con.query = function() {
11-
};
12-
client.connect();
14+
var raisedDrain = false;
15+
client.on('drain', function() {
16+
raisedDrain = true;
17+
});
1318

14-
var raisedDrain = false;
15-
client.on('drain', function() {
16-
raisedDrain = true;
17-
});
19+
client.query("hello");
20+
client.query("sup");
21+
client.query('boom');
1822

19-
client.query("hello");
20-
client.query("sup");
21-
client.query('boom');
23+
test("with pending queries", function() {
24+
test("does not emit drain", function() {
25+
assert.equal(raisedDrain, false);
26+
});
27+
});
2228

23-
test("with pending queries", function() {
24-
test("does not emit drain", function() {
25-
assert.equal(raisedDrain, false);
29+
test("after some queries executed", function() {
30+
con.emit('readyForQuery');
31+
test("does not emit drain", function() {
32+
assert.equal(raisedDrain, false);
33+
});
2634
});
27-
});
2835

29-
test("after some queries executed", function() {
30-
con.emit('readyForQuery');
31-
test("does not emit drain", function() {
32-
assert.equal(raisedDrain, false);
36+
test("when all queries are sent", function() {
37+
con.emit('readyForQuery');
38+
con.emit('readyForQuery');
39+
test("does not emit drain", function() {
40+
assert.equal(raisedDrain, false);
41+
});
3342
});
34-
});
3543

36-
test("when all queries are sent", function() {
37-
con.emit('readyForQuery');
38-
con.emit('readyForQuery');
39-
test("does not emit drain", function() {
40-
assert.equal(raisedDrain, false);
44+
test("after last query finishes", function() {
45+
con.emit('readyForQuery');
46+
test("emits drain", function() {
47+
process.nextTick(function() {
48+
assert.ok(raisedDrain);
49+
})
50+
});
4151
});
4252
});
4353

44-
test("after last query finishes", function() {
45-
con.emit('readyForQuery');
46-
test("emits drain", function() {
47-
process.nextTick(function() {
48-
assert.ok(raisedDrain);
49-
})
54+
test('with drain paused', function() {
55+
//mock out a fake connection
56+
var con = new Connection({stream: "NO"});
57+
con.connect = function() {
58+
con.emit('connect');
59+
};
60+
con.query = function() {
61+
};
62+
63+
var client = new Client({connection:con});
64+
65+
client.connect();
66+
67+
var drainCount = 0;
68+
client.on('drain', function() {
69+
drainCount++;
5070
});
51-
});
5271

72+
test('normally unpaused', function() {
73+
con.emit('readyForQuery');
74+
client.query('boom');
75+
assert.emits(client, 'drain', function() {
76+
assert.equal(drainCount, 1);
77+
});
78+
con.emit('readyForQuery');
79+
});
80+
81+
test('pausing', function() {
82+
test('unpaused with no queries in between', function() {
83+
client.pauseDrain();
84+
client.resumeDrain();
85+
assert.equal(drainCount, 1);
86+
});
87+
88+
test('paused', function() {
89+
test('resumeDrain after empty', function() {
90+
client.pauseDrain();
91+
client.query('asdf');
92+
con.emit('readyForQuery');
93+
assert.equal(drainCount, 1);
94+
client.resumeDrain();
95+
assert.equal(drainCount, 2);
96+
});
97+
98+
test('resumDrain while still pending', function() {
99+
client.pauseDrain();
100+
client.query('asdf');
101+
client.query('asdf1');
102+
con.emit('readyForQuery');
103+
client.resumeDrain();
104+
assert.equal(drainCount, 2);
105+
con.emit('readyForQuery');
106+
assert.equal(drainCount, 3);
107+
});
108+
109+
});
110+
});
111+
112+
});

0 commit comments

Comments
 (0)