Skip to content

Commit cc973d8

Browse files
committed
Lots of cleanup of the replication helper script: use pull instead of push replication (closes issue 46), allow auth credential in the source/target URLs (closes issue 47), use a wait_threshold>0 (issue 48), the source server and target servers are now arguments instead of options on the commandline, and there is optional logging.
--HG-- extra : convert_revision : svn%3A7a298fb0-333a-0410-83e7-658617cd9cf3/trunk%40179
1 parent 7c0f1a1 commit cc973d8

File tree

2 files changed

+138
-56
lines changed

2 files changed

+138
-56
lines changed

couchdb/tools/replication_helper.py

Lines changed: 129 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,27 @@
3333
import fcntl
3434
import httplib
3535
import httplib2
36+
import logging
3637
import optparse
3738
import os
39+
import re
3840
import sys
3941
import time
4042

4143
from couchdb import __version__ as VERSION
4244
from couchdb import json
4345

46+
log = logging.getLogger('couchdb.tools.replication_helper')
47+
4448

4549
class ReplicationHelper(object):
4650
"""Listener daemon for CouchDB database notifications"""
4751

48-
def __init__(self, args):
52+
def __init__(self, source, targets, options):
4953
super(ReplicationHelper, self).__init__()
50-
self.args = args
54+
self.source = source
55+
self.targets = targets
56+
self.options = options
5157
self.http = httplib2.Http()
5258
self.databases = []
5359

@@ -58,40 +64,73 @@ def concat_uri(self, server, path):
5864
else:
5965
return server + path
6066

61-
def trigger_replication(self, database):
62-
"""Triggers replication between --source- and --target-servers"""
63-
64-
body = {'source': self.concat_uri(self.args.source_server, database)}
65-
66-
# send replication request to target server
67-
for target_server in self.args.target_servers:
68-
body['target'] = self.concat_uri(target_server, database)
69-
self.http.request(
70-
self.concat_uri(self.args.source_server, '_replicate'),
71-
'POST',
72-
body=json.encode(body))
73-
7467
def trigger_creation(self, database):
75-
"""Creates database in all --target-servers"""
68+
"""Creates database in all target servers."""
69+
log.debug('Create database %r', database)
7670

7771
# send creation request to target server
78-
for target_server in self.args.target_servers:
79-
self.http.request(
80-
self.concat_uri(target_server, database),
81-
'PUT',)
72+
for target in self.targets:
73+
if target['username'] is not None:
74+
self.http.add_credentials(target['username'],
75+
target['password'])
76+
log.debug('Requesting creation of %r from %s', database,
77+
target['scheme'] + target['host'])
78+
resp, data = self.http.request(
79+
self.concat_uri(target['scheme'] + target['host'], database),
80+
'PUT')
81+
if resp.status != 201:
82+
log.error('Unexpected HTTP response: %s %s (%s)', resp.status,
83+
resp.reason, data)
84+
self.http.clear_credentials()
8285

8386
def trigger_deletion(self, database):
84-
"""Deletes database in all --target-servers"""
87+
"""Deletes database in all target servers."""
88+
log.debug('Delete database %r', database)
89+
8590
# send deletion request to target server
86-
for target_server in self.args.target_servers:
87-
self.http.request(
88-
self.concat_uri(target_server, database),
89-
'DELETE',)
91+
for target in self.targets:
92+
if target['username'] is not None:
93+
self.http.add_credentials(target['username'],
94+
target['password'])
95+
log.debug('Requesting deletion of %r from %s', database,
96+
target['scheme'] + target['host'])
97+
resp, data = self.http.request(
98+
self.concat_uri(target['scheme'] + target['host'], database),
99+
'DELETE')
100+
if resp.status != 200:
101+
log.error('Unexpected HTTP response: %s %s (%s)', resp.status,
102+
resp.reason, data)
103+
self.http.clear_credentials()
90104

91-
def sync_databases(self):
92-
"""Sync self.databases to all target servers"""
105+
def trigger_replication(self, database):
106+
"""Triggers replication between source and target servers."""
107+
log.debug('Replicate database %r', database)
108+
109+
body = {'source': self.concat_uri(self.source, database)}
110+
111+
# send replication request to target server
112+
for target in self.targets:
113+
body['target'] = database
114+
if target['username'] is not None:
115+
self.http.add_credentials(target['username'],
116+
target['password'])
117+
log.debug('Request replication %r from %s', body,
118+
target['scheme'] + target['host'])
119+
resp, data = self.http.request(
120+
self.concat_uri(target['scheme'] + target['host'],
121+
'_replicate'),
122+
'POST',
123+
body=json.encode(body))
124+
if resp.status != 200:
125+
log.error('Unexpected HTTP response: %s %s (%s)', resp.status,
126+
resp.reason, data)
127+
self.http.clear_credentials()
93128

129+
def sync_databases(self):
130+
"""Sync self.databases to all target servers."""
94131
if len(self.databases) > 0:
132+
log.debug('Syncing databases after %d change(s)',
133+
len(self.databases))
95134
for operation, database in self.databases:
96135
try:
97136
# not elegant, but we just don't care for problems
@@ -102,14 +141,15 @@ def sync_databases(self):
102141
self.trigger_deletion(database)
103142
elif operation == 'created':
104143
self.trigger_creation(database)
105-
except httplib.HTTPException:
144+
except httplib.HTTPException, e:
145+
log.error('HTTP error: %s', e, exc_info=True)
106146
sys.exit(0)
107147
self.databases = []
108148

109149
def __call__(self):
110150
"""Reads notifications from stdin and triggers replication"""
111151

112-
args = self.args
152+
options = self.options
113153
wait_counter = time.time()
114154

115155
while True:
@@ -124,75 +164,111 @@ def __call__(self):
124164
sys.exit(0)
125165
note = json.decode(line)
126166

167+
log.debug('Received %r', note)
168+
127169
# we don't care for deletes
128-
if note['type'] == 'delete' and not args.ignore_deletes:
170+
if note['type'] == 'delete' and not options.ignore_deletes:
129171
continue
130172

131173
self.databases.append((note['type'], note['db']))
132174

133175
# if there are more docs that we want to batch, flush
134-
if len(self.databases) >= int(args.batch_threshold):
176+
if len(self.databases) >= int(options.batch_threshold):
135177
self.sync_databases()
136178
continue
137179

138180
except IOError:
139181
# if we waited longer that we want to wait, flush
140-
if (time.time() - wait_counter) > int(args.wait_threshold):
182+
if (time.time() - wait_counter) > int(options.wait_threshold):
141183
self.sync_databases()
142184
wait_counter = time.time()
143185

144-
time.sleep(float(args.wait_threshold))
186+
time.sleep(float(options.wait_threshold))
145187
# implicit continue
146188

147189

190+
URLSPLIT_RE = re.compile(
191+
r'(?P<scheme>https?://)' # http:// or https://
192+
r'((?P<username>.*):(?P<password>.*)@)?' # optional user:pass combo
193+
r'(?P<host>.*)', # hostname:port'''
194+
re.VERBOSE
195+
)
196+
197+
148198
def main():
149-
usage = '%prog [options] --source-server=http://server:port/ \
150-
--target-servers=http://server2:port2/[,http://server3:port3/, ...]'
199+
usage = '%prog [options] SOURCE_URL TARGET_URL1 [TARGET_URL2 ...]'
151200

152201
parser = optparse.OptionParser(usage=usage, version=VERSION)
153202

154-
parser.add_option('--source-server',
155-
action='store',
156-
dest='source_server',
157-
help='the name of the database to replicate from')
158-
parser.add_option('--target-servers',
159-
action='store',
160-
dest='target_servers',
161-
help='comma separated list of databases to replicate to')
162203
parser.add_option('--batch-threshold',
163204
action='store',
164205
dest='batch_threshold',
165206
default=0,
207+
metavar='NUM',
166208
help='number of changes that are to be replicated')
167209
parser.add_option('--wait-threshold',
168210
action='store',
169211
dest='wait_threshold',
170-
default=0,
212+
default=0.01,
213+
metavar='SECS',
171214
help='number of seconds to wait before triggering replication')
172215
parser.add_option('--ignore-deletes',
173-
action='store',
216+
action='store_true',
174217
dest='ignore_deletes',
175-
help='whether to ignore "delete" notifications',
176-
default=True)
218+
help='whether to ignore "delete" notifications')
219+
parser.add_option('--debug',
220+
action='store_true',
221+
dest='debug',
222+
help='enable debug logging; requires --log-file to be specified')
223+
parser.add_option('--log-file',
224+
action='store',
225+
dest='log_file',
226+
metavar='FILE',
227+
help='name of the file to write log messages to, or "-" to enable '
228+
'logging to the standard error stream')
177229
parser.add_option('--json-module',
178230
action='store',
179231
dest='json_module',
180232
metavar='NAME',
181233
help='the JSON module to use ("simplejson", "cjson", or "json" are '
182234
'supported)')
183235

184-
options, arg = parser.parse_args()
185-
186-
if not options.target_servers or not options.source_server:
187-
parser.error("Need at least --source-server and --target-servers")
236+
options, args = parser.parse_args()
237+
if len(args) < 2:
238+
parser.error("need at least one source and target server")
188239
sys.exit(1)
189240

190-
options.target_servers = options.target_servers.split(',')
241+
src_url = args[0]
242+
targets = [
243+
URLSPLIT_RE.match(url).groupdict()
244+
for url in args[1:]
245+
]
246+
247+
if options.debug:
248+
log.setLevel(logging.DEBUG)
249+
250+
if options.log_file:
251+
if options.log_file == '-':
252+
handler = logging.StreamHandler(sys.stderr)
253+
handler.setFormatter(logging.Formatter(
254+
' -> [%(levelname)s] %(message)s'
255+
))
256+
else:
257+
handler = logging.FileHandler(options.log_file)
258+
handler.setFormatter(logging.Formatter(
259+
'[%(asctime)s] [%(levelname)s] %(message)s'
260+
))
261+
log.addHandler(handler)
191262

192263
if options.json_module:
193264
json.use(options.json_module)
194265

195-
ReplicationHelper(options)()
266+
log.debug('Syncing changes from %r to %r', src_url, targets)
267+
try:
268+
ReplicationHelper(src_url, targets, options)()
269+
except Exception, e:
270+
log.exception(e)
271+
196272

197273
if __name__ == '__main__':
198274
main()

couchdb/tools/replication_helper_test.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,20 @@
88
# you should have received as part of this distribution.
99

1010
"""Simple functional test for the replication notification trigger"""
11-
from couchdb import client
11+
1212
import time
1313

14+
from couchdb import client
15+
16+
1417
def set_up_database(server, database):
1518
"""Deletes and creates a `database` on a `server`"""
1619
if database in server:
1720
del server[database]
1821

1922
return server.create(database)
2023

24+
2125
def run_tests():
2226
"""Inserts a doc into database a, waits and tries to read it back from
2327
database b
@@ -37,8 +41,9 @@ def run_tests():
3741
docId = 'testdoc'
3842
# add doc to node a
3943

44+
print 'Inserting document in to database "a"'
4045
db_a[docId] = doc
41-
46+
4247
# wait a bit. Adjust depending on your --wait-threshold setting
4348
time.sleep(5)
4449

@@ -49,11 +54,12 @@ def run_tests():
4954
except client.ResourceNotFound:
5055
print 'FAILURE at reading it back from database "b"'
5156

57+
5258
def main():
5359
print 'Running functional replication test...'
54-
print 'Inserting document in to database "a"'
5560
run_tests()
5661
print 'Done.'
5762

63+
5864
if __name__ == '__main__':
5965
main()

0 commit comments

Comments
 (0)