1
1
'use strict'
2
- const Result = require ( './pg' ) . Result
3
- const prepare = require ( './pg ' ) . prepareValue
2
+ const Result = require ( 'pg/lib/result.js' )
3
+ const prepare = require ( 'pg/lib/utils.js ' ) . prepareValue
4
4
const EventEmitter = require ( 'events' ) . EventEmitter
5
5
const util = require ( 'util' )
6
6
7
- function Cursor ( text , values , config ) {
7
+ let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
8
+
9
+ function Cursor ( text , values , config ) {
8
10
EventEmitter . call ( this )
9
11
10
- this . _conf = config || { }
12
+ this . _conf = config || { }
11
13
this . text = text
12
14
this . values = values ? values . map ( prepare ) : null
13
15
this . connection = null
14
16
this . _queue = [ ]
15
17
this . state = 'initialized'
16
- this . _result = new Result ( this . _conf . rowMode )
18
+ this . _result = new Result ( this . _conf . rowMode , this . _conf . types )
17
19
this . _cb = null
18
20
this . _rows = null
21
+ this . _portal = null
19
22
}
20
23
21
24
util . inherits ( Cursor , EventEmitter )
22
25
23
- Cursor . prototype . submit = function ( connection ) {
26
+ Cursor . prototype . submit = function ( connection ) {
24
27
this . connection = connection
28
+ this . _portal = 'C_' + nextUniqueID ++
25
29
26
30
const con = connection
27
31
28
- con . parse ( {
29
- text : this . text
30
- } , true )
31
-
32
- con . bind ( {
33
- values : this . values
34
- } , true )
35
-
36
- con . describe ( {
37
- type : 'P' ,
38
- name : '' // use unamed portal
39
- } , true )
32
+ con . parse (
33
+ {
34
+ text : this . text ,
35
+ } ,
36
+ true
37
+ )
38
+
39
+ con . bind (
40
+ {
41
+ portal : this . _portal ,
42
+ values : this . values ,
43
+ } ,
44
+ true
45
+ )
46
+
47
+ con . describe (
48
+ {
49
+ type : 'P' ,
50
+ name : this . _portal , // AWS Redshift requires a portal name
51
+ } ,
52
+ true
53
+ )
40
54
41
55
con . flush ( )
42
56
@@ -55,25 +69,25 @@ Cursor.prototype.submit = function (connection) {
55
69
} )
56
70
}
57
71
58
- Cursor . prototype . _shiftQueue = function ( ) {
72
+ Cursor . prototype . _shiftQueue = function ( ) {
59
73
if ( this . _queue . length ) {
60
74
this . _getRows . apply ( this , this . _queue . shift ( ) )
61
75
}
62
76
}
63
77
64
- Cursor . prototype . handleRowDescription = function ( msg ) {
78
+ Cursor . prototype . handleRowDescription = function ( msg ) {
65
79
this . _result . addFields ( msg . fields )
66
80
this . state = 'idle'
67
81
this . _shiftQueue ( )
68
82
}
69
83
70
- Cursor . prototype . handleDataRow = function ( msg ) {
84
+ Cursor . prototype . handleDataRow = function ( msg ) {
71
85
const row = this . _result . parseRow ( msg . fields )
72
86
this . emit ( 'row' , row , this . _result )
73
87
this . _rows . push ( row )
74
88
}
75
89
76
- Cursor . prototype . _sendRows = function ( ) {
90
+ Cursor . prototype . _sendRows = function ( ) {
77
91
this . state = 'idle'
78
92
setImmediate ( ( ) => {
79
93
const cb = this . _cb
@@ -89,33 +103,34 @@ Cursor.prototype._sendRows = function () {
89
103
} )
90
104
}
91
105
92
- Cursor . prototype . handleCommandComplete = function ( ) {
106
+ Cursor . prototype . handleCommandComplete = function ( msg ) {
107
+ this . _result . addCommandComplete ( msg )
93
108
this . connection . sync ( )
94
109
}
95
110
96
- Cursor . prototype . handlePortalSuspended = function ( ) {
111
+ Cursor . prototype . handlePortalSuspended = function ( ) {
97
112
this . _sendRows ( )
98
113
}
99
114
100
- Cursor . prototype . handleReadyForQuery = function ( ) {
115
+ Cursor . prototype . handleReadyForQuery = function ( ) {
101
116
this . _sendRows ( )
102
117
this . emit ( 'end' , this . _result )
103
118
this . state = 'done'
104
119
}
105
120
106
- Cursor . prototype . handleEmptyQuery = function ( ) {
121
+ Cursor . prototype . handleEmptyQuery = function ( ) {
107
122
this . connection . sync ( )
108
123
}
109
124
110
- Cursor . prototype . handleError = function ( msg ) {
125
+ Cursor . prototype . handleError = function ( msg ) {
111
126
this . state = 'error'
112
127
this . _error = msg
113
128
// satisfy any waiting callback
114
129
if ( this . _cb ) {
115
130
this . _cb ( msg )
116
131
}
117
132
// dispatch error to all waiting callbacks
118
- for ( var i = 0 ; i < this . _queue . length ; i ++ ) {
133
+ for ( let i = 0 ; i < this . _queue . length ; i ++ ) {
119
134
this . _queue . pop ( ) [ 1 ] ( msg )
120
135
}
121
136
@@ -127,45 +142,45 @@ Cursor.prototype.handleError = function (msg) {
127
142
this . connection . sync ( )
128
143
}
129
144
130
- Cursor . prototype . _getRows = function ( rows , cb ) {
145
+ Cursor . prototype . _getRows = function ( rows , cb ) {
131
146
this . state = 'busy'
132
147
this . _cb = cb
133
148
this . _rows = [ ]
134
149
const msg = {
135
- portal : '' ,
136
- rows : rows
150
+ portal : this . _portal ,
151
+ rows : rows ,
137
152
}
138
153
this . connection . execute ( msg , true )
139
154
this . connection . flush ( )
140
155
}
141
156
142
- Cursor . prototype . end = function ( cb ) {
157
+ Cursor . prototype . end = function ( cb ) {
143
158
if ( this . state !== 'initialized' ) {
144
159
this . connection . sync ( )
145
160
}
146
- this . connection . stream . once ( 'end' , cb )
161
+ this . connection . once ( 'end' , cb )
147
162
this . connection . end ( )
148
163
}
149
164
150
- Cursor . prototype . close = function ( cb ) {
165
+ Cursor . prototype . close = function ( cb ) {
151
166
if ( this . state === 'done' ) {
152
167
if ( cb ) {
153
168
return setImmediate ( cb )
154
169
} else {
155
170
return
156
171
}
157
172
}
158
- this . connection . close ( { type : 'P' } )
173
+ this . connection . close ( { type : 'P' } )
159
174
this . connection . sync ( )
160
175
this . state = 'done'
161
176
if ( cb ) {
162
- this . connection . once ( 'closeComplete' , function ( ) {
177
+ this . connection . once ( 'closeComplete' , function ( ) {
163
178
cb ( )
164
179
} )
165
180
}
166
181
}
167
182
168
- Cursor . prototype . read = function ( rows , cb ) {
183
+ Cursor . prototype . read = function ( rows , cb ) {
169
184
if ( this . state === 'idle' ) {
170
185
return this . _getRows ( rows , cb )
171
186
}
0 commit comments