From 2eca71614f08db3649ac73e27595e96ff81ab275 Mon Sep 17 00:00:00 2001 From: Kevin Jing Qiu Date: Sat, 18 Mar 2017 17:25:37 -0400 Subject: [PATCH 1/5] issue #313: Fix multiprocessing bug --- couchdb/http.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/couchdb/http.py b/couchdb/http.py index 4a6750e9..9f928a63 100644 --- a/couchdb/http.py +++ b/couchdb/http.py @@ -13,6 +13,7 @@ from base64 import b64encode from datetime import datetime +import os import errno import socket import time @@ -482,17 +483,20 @@ class ConnectionPool(object): def __init__(self, timeout, disable_ssl_verification=False): self.timeout = timeout self.disable_ssl_verification = disable_ssl_verification - self.conns = {} # HTTP connections keyed by (scheme, host) + self.conns = {} # HTTP connections keyed by (current_process_pid, scheme, host) self.lock = Lock() - def get(self, url): + @property + def _current_process_id(self): + return os.getpid() + def get(self, url): scheme, host = util.urlsplit(url, 'http', False)[:2] # Try to reuse an existing connection. self.lock.acquire() try: - conns = self.conns.setdefault((scheme, host), []) + conns = self.conns.setdefault((self._current_process_id, scheme, host), []) if conns: conn = conns.pop(-1) else: @@ -520,7 +524,7 @@ def release(self, url, conn): scheme, host = util.urlsplit(url, 'http', False)[:2] self.lock.acquire() try: - self.conns.setdefault((scheme, host), []).append(conn) + self.conns.setdefault((self._current_process_id, scheme, host), []).append(conn) finally: self.lock.release() From 24045969a07d3fce145def5abbe0bd04a424ae22 Mon Sep 17 00:00:00 2001 From: Kevin Jing Qiu Date: Sat, 18 Mar 2017 17:27:00 -0400 Subject: [PATCH 2/5] issue #313: Add tests --- couchdb/tests/client.py | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/couchdb/tests/client.py b/couchdb/tests/client.py index 5aec3939..ad83d7d4 100644 --- a/couchdb/tests/client.py +++ b/couchdb/tests/client.py @@ -7,6 +7,8 @@ # you should have received as part of this distribution. from datetime import datetime +import functools +import multiprocessing import os import os.path import shutil @@ -19,6 +21,10 @@ from couchdb.tests import testutil +def _current_pid(): + return os.getpid() + + class ServerTestCase(testutil.TempDatabaseMixin, unittest.TestCase): def test_init_with_resource(self): @@ -481,7 +487,9 @@ def test_changes_releases_conn(self): # that the HTTP connection made it to the pool. list(self.db.changes(feed='continuous', timeout=0)) scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2] - self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)]) + current_pid = _current_pid() + key = (current_pid, scheme, netloc) + self.assertTrue(self.db.resource.session.connection_pool.conns[key]) def test_changes_releases_conn_when_lastseq(self): # Consume a changes feed, stopping at the 'last_seq' item, i.e. don't @@ -490,8 +498,10 @@ def test_changes_releases_conn_when_lastseq(self): for obj in self.db.changes(feed='continuous', timeout=0): if 'last_seq' in obj: break + current_pid = _current_pid() scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2] - self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)]) + key = (current_pid, scheme, netloc) + self.assertTrue(self.db.resource.session.connection_pool.conns[key]) def test_changes_conn_usable(self): # Consume a changes feed to get a used connection in the pool. @@ -838,8 +848,33 @@ def test_startkey(self): def test_nullkeys(self): self.assertEqual(len(list(self.db.iterview('test/nulls', 10))), self.num_docs) + +def _get_by_id(db, result, id): + result.append(db[id]) + + +class TestConcurrent(testutil.TempDatabaseMixin, unittest.TestCase): + def test_concurrent_get(self): + self.db.save({'_id': 'foo', 'value': 'hello'}) + self.db.save({'_id': 'bar', 'value': 'world'}) + processes = [] + result = multiprocessing.Manager().list() + for id in ('foo', 'bar'): + process = multiprocessing.Process(target=functools.partial(_get_by_id, self.db, result), + args=(id,)) + processes.append(process) + process.start() + + for process in processes: + process.join() + + self.assertEqual(len(result), 2) + self.assertEqual(set(['hello', 'world']), set([r['value'] for r in result])) + + def suite(): suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestConcurrent, 'test')) suite.addTest(unittest.makeSuite(ServerTestCase, 'test')) suite.addTest(unittest.makeSuite(DatabaseTestCase, 'test')) suite.addTest(unittest.makeSuite(ViewTestCase, 'test')) From c461b6b87c70105f5b531eb6d2c66a97f5660dce Mon Sep 17 00:00:00 2001 From: Kevin Jing Qiu Date: Sun, 19 Mar 2017 11:32:13 -0400 Subject: [PATCH 3/5] issue #313: Use `os.getpid()` directly without another indirection --- couchdb/http.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/couchdb/http.py b/couchdb/http.py index 9f928a63..9e21fbd1 100644 --- a/couchdb/http.py +++ b/couchdb/http.py @@ -483,20 +483,16 @@ class ConnectionPool(object): def __init__(self, timeout, disable_ssl_verification=False): self.timeout = timeout self.disable_ssl_verification = disable_ssl_verification - self.conns = {} # HTTP connections keyed by (current_process_pid, scheme, host) + self.conns = {} # HTTP connections keyed by (os.getpid(), scheme, host) self.lock = Lock() - @property - def _current_process_id(self): - return os.getpid() - def get(self, url): scheme, host = util.urlsplit(url, 'http', False)[:2] # Try to reuse an existing connection. self.lock.acquire() try: - conns = self.conns.setdefault((self._current_process_id, scheme, host), []) + conns = self.conns.setdefault((os.getpid(), scheme, host), []) if conns: conn = conns.pop(-1) else: @@ -524,7 +520,7 @@ def release(self, url, conn): scheme, host = util.urlsplit(url, 'http', False)[:2] self.lock.acquire() try: - self.conns.setdefault((self._current_process_id, scheme, host), []).append(conn) + self.conns.setdefault((os.getpid(), scheme, host), []).append(conn) finally: self.lock.release() From d1c054695a29746917937f39f8ed9df14222a1a1 Mon Sep 17 00:00:00 2001 From: Kevin Jing Qiu Date: Sun, 19 Mar 2017 11:33:42 -0400 Subject: [PATCH 4/5] issue #313: Get rid of the indirection in test code as well --- couchdb/tests/client.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/couchdb/tests/client.py b/couchdb/tests/client.py index ad83d7d4..dc354d50 100644 --- a/couchdb/tests/client.py +++ b/couchdb/tests/client.py @@ -21,10 +21,6 @@ from couchdb.tests import testutil -def _current_pid(): - return os.getpid() - - class ServerTestCase(testutil.TempDatabaseMixin, unittest.TestCase): def test_init_with_resource(self): @@ -487,7 +483,7 @@ def test_changes_releases_conn(self): # that the HTTP connection made it to the pool. list(self.db.changes(feed='continuous', timeout=0)) scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2] - current_pid = _current_pid() + current_pid = os.getpid() key = (current_pid, scheme, netloc) self.assertTrue(self.db.resource.session.connection_pool.conns[key]) @@ -498,7 +494,7 @@ def test_changes_releases_conn_when_lastseq(self): for obj in self.db.changes(feed='continuous', timeout=0): if 'last_seq' in obj: break - current_pid = _current_pid() + current_pid = os.getpid() scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2] key = (current_pid, scheme, netloc) self.assertTrue(self.db.resource.session.connection_pool.conns[key]) From f234ce849660acc0b81cda3a178a3c1328891f2d Mon Sep 17 00:00:00 2001 From: Kevin Jing Qiu Date: Sun, 19 Mar 2017 17:31:55 -0400 Subject: [PATCH 5/5] issue #313: trigger build