1
- var DB = require ( ' sharedb' ) . DB ;
2
- var pg = require ( 'pg' ) ;
1
+ var DB = require ( " sharedb" ) . DB ;
2
+ var pg = require ( "pg" ) ;
3
3
4
4
// Postgres-backed ShareDB database
5
5
@@ -10,27 +10,34 @@ function PostgresDB(options) {
10
10
this . closed = false ;
11
11
12
12
this . pool = new pg . Pool ( options ) ;
13
- } ;
13
+ }
14
14
module . exports = PostgresDB ;
15
15
16
16
PostgresDB . prototype = Object . create ( DB . prototype ) ;
17
17
18
- PostgresDB . prototype . close = function ( callback ) {
18
+ PostgresDB . prototype . close = function ( callback ) {
19
19
this . closed = true ;
20
20
this . pool . end ( ) ;
21
-
21
+
22
22
if ( callback ) callback ( ) ;
23
23
} ;
24
24
25
25
function rollback ( client , done ) {
26
- client . query ( ' ROLLBACK' , function ( err ) {
26
+ client . query ( " ROLLBACK" , function ( err ) {
27
27
return done ( err ) ;
28
- } )
28
+ } ) ;
29
29
}
30
30
31
31
// Persists an op and snapshot if it is for the next version. Calls back with
32
32
// callback(err, succeeded)
33
- PostgresDB . prototype . commit = function ( collection , id , op , snapshot , options , callback ) {
33
+ PostgresDB . prototype . commit = function (
34
+ collection ,
35
+ id ,
36
+ op ,
37
+ snapshot ,
38
+ options ,
39
+ callback ,
40
+ ) {
34
41
/*
35
42
* op: CreateOp {
36
43
* src: '24545654654646',
@@ -41,37 +48,36 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
41
48
* }
42
49
* snapshot: PostgresSnapshot
43
50
*/
44
- this . pool . connect ( function ( err , client , done ) {
51
+ this . pool . connect ( function ( err , client , done ) {
45
52
if ( err ) {
46
53
done ( client ) ;
47
54
callback ( err ) ;
48
55
return ;
49
56
}
50
57
function commit ( ) {
51
- client . query ( ' COMMIT' , function ( err ) {
58
+ client . query ( " COMMIT" , function ( err ) {
52
59
done ( err ) ;
53
60
if ( err ) {
54
61
callback ( err ) ;
55
62
} else {
56
63
callback ( null , true ) ;
57
64
}
58
- } )
65
+ } ) ;
59
66
}
60
67
client . query (
61
- ' SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2' ,
68
+ " SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2" ,
62
69
[ collection , id ] ,
63
- function ( err , res ) {
70
+ function ( err , res ) {
64
71
var max_version = res . rows [ 0 ] . max_version ;
65
- if ( max_version == null )
66
- max_version = 0 ;
72
+ if ( max_version == null ) max_version = 0 ;
67
73
if ( snapshot . v !== max_version + 1 ) {
68
74
return callback ( null , false ) ;
69
75
}
70
- client . query ( ' BEGIN' , function ( err ) {
76
+ client . query ( " BEGIN" , function ( err ) {
71
77
client . query (
72
- ' INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)' ,
78
+ " INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)" ,
73
79
[ collection , id , snapshot . v , op ] ,
74
- function ( err , res ) {
80
+ function ( err , res ) {
75
81
if ( err ) {
76
82
// TODO: if err is "constraint violation", callback(null, false) instead
77
83
rollback ( client , done ) ;
@@ -80,9 +86,9 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
80
86
}
81
87
if ( snapshot . v === 1 ) {
82
88
client . query (
83
- ' INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)' ,
89
+ " INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)" ,
84
90
[ collection , id , snapshot . type , snapshot . v , snapshot . data ] ,
85
- function ( err , res ) {
91
+ function ( err , res ) {
86
92
// TODO:
87
93
// if the insert was successful and did insert, callback(null, true)
88
94
// if the insert was successful and did not insert, callback(null, false)
@@ -93,13 +99,13 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
93
99
return ;
94
100
}
95
101
commit ( ) ;
96
- }
97
- )
102
+ } ,
103
+ ) ;
98
104
} else {
99
105
client . query (
100
- ' UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)' ,
106
+ " UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)" ,
101
107
[ collection , id , snapshot . type , snapshot . v , snapshot . data ] ,
102
- function ( err , res ) {
108
+ function ( err , res ) {
103
109
// TODO:
104
110
// if any rows were updated, success
105
111
// if 0 rows were updated, rollback and not success
@@ -110,59 +116,65 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
110
116
return ;
111
117
}
112
118
commit ( ) ;
113
- }
114
- )
119
+ } ,
120
+ ) ;
115
121
}
116
- }
117
- )
118
- } )
119
- }
120
- )
121
- } )
122
+ } ,
123
+ ) ;
124
+ } ) ;
125
+ } ,
126
+ ) ;
127
+ } ) ;
122
128
} ;
123
129
124
130
// Get the named document from the database. The callback is called with (err,
125
131
// snapshot). A snapshot with a version of zero is returned if the docuemnt
126
132
// has never been created in the database.
127
- PostgresDB . prototype . getSnapshot = function ( collection , id , fields , options , callback ) {
128
- this . pool . connect ( function ( err , client , done ) {
133
+ PostgresDB . prototype . getSnapshot = function (
134
+ collection ,
135
+ id ,
136
+ fields ,
137
+ options ,
138
+ callback ,
139
+ ) {
140
+ this . pool . connect ( function ( err , client , done ) {
129
141
if ( err ) {
130
142
done ( client ) ;
131
143
callback ( err ) ;
132
144
return ;
133
145
}
134
146
client . query (
135
- ' SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
147
+ " SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1" ,
136
148
[ collection , id ] ,
137
- function ( err , res ) {
149
+ function ( err , res ) {
138
150
done ( ) ;
139
151
if ( err ) {
140
152
callback ( err ) ;
141
153
return ;
142
154
}
143
155
if ( res . rows . length ) {
144
- var row = res . rows [ 0 ]
156
+ var row = res . rows [ 0 ] ;
145
157
var snapshot = new PostgresSnapshot (
146
158
id ,
147
159
row . version ,
148
160
row . doc_type ,
149
161
row . data ,
150
- undefined // TODO: metadata
151
- )
162
+ undefined , // TODO: metadata
163
+ ) ;
152
164
callback ( null , snapshot ) ;
153
165
} else {
154
166
var snapshot = new PostgresSnapshot (
155
167
id ,
156
168
0 ,
157
169
null ,
158
170
undefined ,
159
- undefined
160
- )
171
+ undefined ,
172
+ ) ;
161
173
callback ( null , snapshot ) ;
162
174
}
163
- }
164
- )
165
- } )
175
+ } ,
176
+ ) ;
177
+ } ) ;
166
178
} ;
167
179
168
180
// Get operations between [from, to) noninclusively. (Ie, the range should
@@ -174,28 +186,86 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
174
186
// The version will be inferred from the parameters if it is missing.
175
187
//
176
188
// Callback should be called as callback(error, [list of ops]);
177
- PostgresDB . prototype . getOps = function ( collection , id , from , to , options , callback ) {
178
- this . pool . connect ( function ( err , client , done ) {
189
+ PostgresDB . prototype . getOps = function (
190
+ collection ,
191
+ id ,
192
+ from ,
193
+ to ,
194
+ options ,
195
+ callback ,
196
+ ) {
197
+ this . pool . connect ( function ( err , client , done ) {
179
198
if ( err ) {
180
199
done ( client ) ;
181
200
callback ( err ) ;
182
201
return ;
183
202
}
184
203
client . query (
185
- ' SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4' ,
204
+ " SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4" ,
186
205
[ collection , id , from , to ] ,
187
- function ( err , res ) {
206
+ function ( err , res ) {
207
+ done ( ) ;
208
+ if ( err ) {
209
+ callback ( err ) ;
210
+ return ;
211
+ }
212
+ callback (
213
+ null ,
214
+ res . rows . map ( function ( row ) {
215
+ return row . operation ;
216
+ } ) ,
217
+ ) ;
218
+ } ,
219
+ ) ;
220
+ } ) ;
221
+ } ;
222
+
223
+ PostgresDB . prototype . query = function (
224
+ collectionName ,
225
+ inputQuery ,
226
+ fields ,
227
+ options ,
228
+ callback ,
229
+ ) {
230
+ this . pool . connect ( function ( err , client , done ) {
231
+ if ( err ) {
232
+ done ( client ) ;
233
+ callback ( err ) ;
234
+ return ;
235
+ }
236
+ // TODO: more consistent parse
237
+ const limit = inputQuery [ "$limit" ] ;
238
+ const sort = inputQuery [ "$sort" ] ;
239
+ const sortKey = Object . keys ( sort ) [ 0 ] ;
240
+ const sortDirection = sort [ sortKey ] === - 1 ? "DESC" : "ASC" ;
241
+
242
+ client . query (
243
+ `SELECT version, data, doc_type FROM snapshots WHERE collection = $1 ORDER BY data->>$3 ${ sortDirection } LIMIT $2` ,
244
+ [ collectionName , limit , sortKey ] ,
245
+ function ( err , res ) {
188
246
done ( ) ;
189
247
if ( err ) {
190
248
callback ( err ) ;
191
249
return ;
192
250
}
193
- callback ( null , res . rows . map ( function ( row ) {
194
- return row . operation ;
195
- } ) ) ;
196
- }
197
- )
198
- } )
251
+ if ( res . rows . length ) {
252
+ var snapshots = [ ] ;
253
+ for ( var i = 0 ; i < res . rows . length ; i ++ ) {
254
+ const row = res . rows [ i ] ;
255
+ var snapshot = new PostgresSnapshot (
256
+ row . data . documentId ,
257
+ row . version ,
258
+ row . doc_type ,
259
+ row . data ,
260
+ undefined , // TODO: metadata
261
+ ) ;
262
+ snapshots . push ( snapshot ) ;
263
+ }
264
+ callback ( null , snapshots ) ;
265
+ }
266
+ } ,
267
+ ) ;
268
+ } ) ;
199
269
} ;
200
270
201
271
function PostgresSnapshot ( id , version , type , data , meta ) {
0 commit comments