33
33
import fcntl
34
34
import httplib
35
35
import httplib2
36
+ import logging
36
37
import optparse
37
38
import os
39
+ import re
38
40
import sys
39
41
import time
40
42
41
43
from couchdb import __version__ as VERSION
42
44
from couchdb import json
43
45
46
+ log = logging .getLogger ('couchdb.tools.replication_helper' )
47
+
44
48
45
49
class ReplicationHelper (object ):
46
50
"""Listener daemon for CouchDB database notifications"""
47
51
48
- def __init__ (self , args ):
52
+ def __init__ (self , source , targets , options ):
49
53
super (ReplicationHelper , self ).__init__ ()
50
- self .args = args
54
+ self .source = source
55
+ self .targets = targets
56
+ self .options = options
51
57
self .http = httplib2 .Http ()
52
58
self .databases = []
53
59
@@ -58,40 +64,73 @@ def concat_uri(self, server, path):
58
64
else :
59
65
return server + path
60
66
61
- def trigger_replication (self , database ):
62
- """Triggers replication between --source- and --target-servers"""
63
-
64
- body = {'source' : self .concat_uri (self .args .source_server , database )}
65
-
66
- # send replication request to target server
67
- for target_server in self .args .target_servers :
68
- body ['target' ] = self .concat_uri (target_server , database )
69
- self .http .request (
70
- self .concat_uri (self .args .source_server , '_replicate' ),
71
- 'POST' ,
72
- body = json .encode (body ))
73
-
74
67
def trigger_creation (self , database ):
75
- """Creates database in all --target-servers"""
68
+ """Creates database in all target servers."""
69
+ log .debug ('Create database %r' , database )
76
70
77
71
# send creation request to target server
78
- for target_server in self .args .target_servers :
79
- self .http .request (
80
- self .concat_uri (target_server , database ),
81
- 'PUT' ,)
72
+ for target in self .targets :
73
+ if target ['username' ] is not None :
74
+ self .http .add_credentials (target ['username' ],
75
+ target ['password' ])
76
+ log .debug ('Requesting creation of %r from %s' , database ,
77
+ target ['scheme' ] + target ['host' ])
78
+ resp , data = self .http .request (
79
+ self .concat_uri (target ['scheme' ] + target ['host' ], database ),
80
+ 'PUT' )
81
+ if resp .status != 201 :
82
+ log .error ('Unexpected HTTP response: %s %s (%s)' , resp .status ,
83
+ resp .reason , data )
84
+ self .http .clear_credentials ()
82
85
83
86
def trigger_deletion (self , database ):
84
- """Deletes database in all --target-servers"""
87
+ """Deletes database in all target servers."""
88
+ log .debug ('Delete database %r' , database )
89
+
85
90
# send deletion request to target server
86
- for target_server in self .args .target_servers :
87
- self .http .request (
88
- self .concat_uri (target_server , database ),
89
- 'DELETE' ,)
91
+ for target in self .targets :
92
+ if target ['username' ] is not None :
93
+ self .http .add_credentials (target ['username' ],
94
+ target ['password' ])
95
+ log .debug ('Requesting deletion of %r from %s' , database ,
96
+ target ['scheme' ] + target ['host' ])
97
+ resp , data = self .http .request (
98
+ self .concat_uri (target ['scheme' ] + target ['host' ], database ),
99
+ 'DELETE' )
100
+ if resp .status != 200 :
101
+ log .error ('Unexpected HTTP response: %s %s (%s)' , resp .status ,
102
+ resp .reason , data )
103
+ self .http .clear_credentials ()
90
104
91
- def sync_databases (self ):
92
- """Sync self.databases to all target servers"""
105
+ def trigger_replication (self , database ):
106
+ """Triggers replication between source and target servers."""
107
+ log .debug ('Replicate database %r' , database )
108
+
109
+ body = {'source' : self .concat_uri (self .source , database )}
110
+
111
+ # send replication request to target server
112
+ for target in self .targets :
113
+ body ['target' ] = database
114
+ if target ['username' ] is not None :
115
+ self .http .add_credentials (target ['username' ],
116
+ target ['password' ])
117
+ log .debug ('Request replication %r from %s' , body ,
118
+ target ['scheme' ] + target ['host' ])
119
+ resp , data = self .http .request (
120
+ self .concat_uri (target ['scheme' ] + target ['host' ],
121
+ '_replicate' ),
122
+ 'POST' ,
123
+ body = json .encode (body ))
124
+ if resp .status != 200 :
125
+ log .error ('Unexpected HTTP response: %s %s (%s)' , resp .status ,
126
+ resp .reason , data )
127
+ self .http .clear_credentials ()
93
128
129
+ def sync_databases (self ):
130
+ """Sync self.databases to all target servers."""
94
131
if len (self .databases ) > 0 :
132
+ log .debug ('Syncing databases after %d change(s)' ,
133
+ len (self .databases ))
95
134
for operation , database in self .databases :
96
135
try :
97
136
# not elegant, but we just don't care for problems
@@ -102,14 +141,15 @@ def sync_databases(self):
102
141
self .trigger_deletion (database )
103
142
elif operation == 'created' :
104
143
self .trigger_creation (database )
105
- except httplib .HTTPException :
144
+ except httplib .HTTPException , e :
145
+ log .error ('HTTP error: %s' , e , exc_info = True )
106
146
sys .exit (0 )
107
147
self .databases = []
108
148
109
149
def __call__ (self ):
110
150
"""Reads notifications from stdin and triggers replication"""
111
151
112
- args = self .args
152
+ options = self .options
113
153
wait_counter = time .time ()
114
154
115
155
while True :
@@ -124,75 +164,111 @@ def __call__(self):
124
164
sys .exit (0 )
125
165
note = json .decode (line )
126
166
167
+ log .debug ('Received %r' , note )
168
+
127
169
# we don't care for deletes
128
- if note ['type' ] == 'delete' and not args .ignore_deletes :
170
+ if note ['type' ] == 'delete' and not options .ignore_deletes :
129
171
continue
130
172
131
173
self .databases .append ((note ['type' ], note ['db' ]))
132
174
133
175
# if there are more docs that we want to batch, flush
134
- if len (self .databases ) >= int (args .batch_threshold ):
176
+ if len (self .databases ) >= int (options .batch_threshold ):
135
177
self .sync_databases ()
136
178
continue
137
179
138
180
except IOError :
139
181
# if we waited longer that we want to wait, flush
140
- if (time .time () - wait_counter ) > int (args .wait_threshold ):
182
+ if (time .time () - wait_counter ) > int (options .wait_threshold ):
141
183
self .sync_databases ()
142
184
wait_counter = time .time ()
143
185
144
- time .sleep (float (args .wait_threshold ))
186
+ time .sleep (float (options .wait_threshold ))
145
187
# implicit continue
146
188
147
189
190
+ URLSPLIT_RE = re .compile (
191
+ r'(?P<scheme>https?://)' # http:// or https://
192
+ r'((?P<username>.*):(?P<password>.*)@)?' # optional user:pass combo
193
+ r'(?P<host>.*)' , # hostname:port'''
194
+ re .VERBOSE
195
+ )
196
+
197
+
148
198
def main ():
149
- usage = '%prog [options] --source-server=http://server:port/ \
150
- --target-servers=http://server2:port2/[,http://server3:port3/, ...]'
199
+ usage = '%prog [options] SOURCE_URL TARGET_URL1 [TARGET_URL2 ...]'
151
200
152
201
parser = optparse .OptionParser (usage = usage , version = VERSION )
153
202
154
- parser .add_option ('--source-server' ,
155
- action = 'store' ,
156
- dest = 'source_server' ,
157
- help = 'the name of the database to replicate from' )
158
- parser .add_option ('--target-servers' ,
159
- action = 'store' ,
160
- dest = 'target_servers' ,
161
- help = 'comma separated list of databases to replicate to' )
162
203
parser .add_option ('--batch-threshold' ,
163
204
action = 'store' ,
164
205
dest = 'batch_threshold' ,
165
206
default = 0 ,
207
+ metavar = 'NUM' ,
166
208
help = 'number of changes that are to be replicated' )
167
209
parser .add_option ('--wait-threshold' ,
168
210
action = 'store' ,
169
211
dest = 'wait_threshold' ,
170
- default = 0 ,
212
+ default = 0.01 ,
213
+ metavar = 'SECS' ,
171
214
help = 'number of seconds to wait before triggering replication' )
172
215
parser .add_option ('--ignore-deletes' ,
173
- action = 'store ' ,
216
+ action = 'store_true ' ,
174
217
dest = 'ignore_deletes' ,
175
- help = 'whether to ignore "delete" notifications' ,
176
- default = True )
218
+ help = 'whether to ignore "delete" notifications' )
219
+ parser .add_option ('--debug' ,
220
+ action = 'store_true' ,
221
+ dest = 'debug' ,
222
+ help = 'enable debug logging; requires --log-file to be specified' )
223
+ parser .add_option ('--log-file' ,
224
+ action = 'store' ,
225
+ dest = 'log_file' ,
226
+ metavar = 'FILE' ,
227
+ help = 'name of the file to write log messages to, or "-" to enable '
228
+ 'logging to the standard error stream' )
177
229
parser .add_option ('--json-module' ,
178
230
action = 'store' ,
179
231
dest = 'json_module' ,
180
232
metavar = 'NAME' ,
181
233
help = 'the JSON module to use ("simplejson", "cjson", or "json" are '
182
234
'supported)' )
183
235
184
- options , arg = parser .parse_args ()
185
-
186
- if not options .target_servers or not options .source_server :
187
- parser .error ("Need at least --source-server and --target-servers" )
236
+ options , args = parser .parse_args ()
237
+ if len (args ) < 2 :
238
+ parser .error ("need at least one source and target server" )
188
239
sys .exit (1 )
189
240
190
- options .target_servers = options .target_servers .split (',' )
241
+ src_url = args [0 ]
242
+ targets = [
243
+ URLSPLIT_RE .match (url ).groupdict ()
244
+ for url in args [1 :]
245
+ ]
246
+
247
+ if options .debug :
248
+ log .setLevel (logging .DEBUG )
249
+
250
+ if options .log_file :
251
+ if options .log_file == '-' :
252
+ handler = logging .StreamHandler (sys .stderr )
253
+ handler .setFormatter (logging .Formatter (
254
+ ' -> [%(levelname)s] %(message)s'
255
+ ))
256
+ else :
257
+ handler = logging .FileHandler (options .log_file )
258
+ handler .setFormatter (logging .Formatter (
259
+ '[%(asctime)s] [%(levelname)s] %(message)s'
260
+ ))
261
+ log .addHandler (handler )
191
262
192
263
if options .json_module :
193
264
json .use (options .json_module )
194
265
195
- ReplicationHelper (options )()
266
+ log .debug ('Syncing changes from %r to %r' , src_url , targets )
267
+ try :
268
+ ReplicationHelper (src_url , targets , options )()
269
+ except Exception , e :
270
+ log .exception (e )
271
+
196
272
197
273
if __name__ == '__main__' :
198
274
main ()
0 commit comments