Skip to content

Commit f5ba660

Browse files
committed
python scripts
1 parent a3e9ee5 commit f5ba660

File tree

4 files changed

+145
-80
lines changed

4 files changed

+145
-80
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ regression.diffs
55
regression.out
66
*.o
77
*.so
8+
*.pyc
89
pg_pathman--*.sql

init.sql

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -140,95 +140,95 @@ CREATE TYPE @extschema@.PathmanRange (
140140
output = pathman_range_out
141141
);
142142

143-
/*
144-
* Copy rows to partitions
145-
*/
146-
-- CREATE OR REPLACE FUNCTION @extschema@.partition_data(
147-
-- p_relation regclass
148-
-- , p_min ANYELEMENT DEFAULT NULL::text
149-
-- , p_max ANYELEMENT DEFAULT NULL::text
150-
-- , p_limit INT DEFAULT NULL
151-
-- , OUT p_total BIGINT)
152-
-- AS
153-
-- $$
154-
-- DECLARE
155-
-- v_attr TEXT;
156-
-- v_limit_clause TEXT := '';
157-
-- v_where_clause TEXT := '';
158-
-- BEGIN
159-
-- SELECT attname INTO v_attr
160-
-- FROM @extschema@.pathman_config WHERE partrel = p_relation;
161-
162-
-- PERFORM @extschema@.debug_capture();
163-
164-
-- p_total := 0;
165-
166-
-- /* Format LIMIT clause if needed */
167-
-- IF NOT p_limit IS NULL THEN
168-
-- v_limit_clause := format('LIMIT %s', p_limit);
169-
-- END IF;
170-
171-
-- /* Format WHERE clause if needed */
172-
-- IF NOT p_min IS NULL THEN
173-
-- v_where_clause := format('%1$s >= $1', v_attr);
174-
-- END IF;
175-
176-
-- IF NOT p_max IS NULL THEN
177-
-- IF NOT p_min IS NULL THEN
178-
-- v_where_clause := v_where_clause || ' AND ';
179-
-- END IF;
180-
-- v_where_clause := v_where_clause || format('%1$s < $2', v_attr);
181-
-- END IF;
182-
183-
-- IF v_where_clause != '' THEN
184-
-- v_where_clause := 'WHERE ' || v_where_clause;
185-
-- END IF;
186-
187-
-- /* Lock rows and copy data */
188-
-- RAISE NOTICE 'Copying data to partitions...';
189-
-- EXECUTE format('
190-
-- WITH data AS (
191-
-- DELETE FROM ONLY %1$s WHERE ctid IN (
192-
-- SELECT ctid FROM ONLY %1$s %2$s %3$s FOR UPDATE NOWAIT
193-
-- ) RETURNING *)
194-
-- INSERT INTO %1$s SELECT * FROM data'
195-
-- , p_relation, v_where_clause, v_limit_clause)
196-
-- USING p_min, p_max;
197-
198-
-- GET DIAGNOSTICS p_total = ROW_COUNT;
199-
-- RETURN;
200-
-- END
201-
-- $$
202-
-- LANGUAGE plpgsql;
203-
204143
/*
205144
* Copy rows to partitions
206145
*/
207146
CREATE OR REPLACE FUNCTION @extschema@.partition_data(
208-
parent_relid REGCLASS,
209-
OUT p_total BIGINT)
147+
p_relation regclass
148+
, p_min ANYELEMENT DEFAULT NULL::text
149+
, p_max ANYELEMENT DEFAULT NULL::text
150+
, p_limit INT DEFAULT NULL
151+
, OUT p_total BIGINT)
210152
AS
211153
$$
212154
DECLARE
213-
relname TEXT;
214-
rec RECORD;
215-
cnt BIGINT := 0;
216-
155+
v_attr TEXT;
156+
v_limit_clause TEXT := '';
157+
v_where_clause TEXT := '';
217158
BEGIN
218-
p_total := 0;
219-
220-
/* Create partitions and copy rest of the data */
221-
EXECUTE format('WITH part_data AS (DELETE FROM ONLY %1$s RETURNING *)
222-
INSERT INTO %1$s SELECT * FROM part_data',
223-
@extschema@.get_schema_qualified_name(parent_relid));
224-
225-
/* Get number of inserted rows */
226-
GET DIAGNOSTICS p_total = ROW_COUNT;
227-
RETURN;
159+
SELECT attname INTO v_attr
160+
FROM @extschema@.pathman_config WHERE partrel = p_relation;
161+
162+
PERFORM @extschema@.debug_capture();
163+
164+
p_total := 0;
165+
166+
/* Format LIMIT clause if needed */
167+
IF NOT p_limit IS NULL THEN
168+
v_limit_clause := format('LIMIT %s', p_limit);
169+
END IF;
170+
171+
/* Format WHERE clause if needed */
172+
IF NOT p_min IS NULL THEN
173+
v_where_clause := format('%1$s >= $1', v_attr);
174+
END IF;
175+
176+
IF NOT p_max IS NULL THEN
177+
IF NOT p_min IS NULL THEN
178+
v_where_clause := v_where_clause || ' AND ';
179+
END IF;
180+
v_where_clause := v_where_clause || format('%1$s < $2', v_attr);
181+
END IF;
182+
183+
IF v_where_clause != '' THEN
184+
v_where_clause := 'WHERE ' || v_where_clause;
185+
END IF;
186+
187+
/* Lock rows and copy data */
188+
RAISE NOTICE 'Copying data to partitions...';
189+
EXECUTE format('
190+
WITH data AS (
191+
DELETE FROM ONLY %1$s WHERE ctid IN (
192+
SELECT ctid FROM ONLY %1$s %2$s %3$s FOR UPDATE NOWAIT
193+
) RETURNING *)
194+
INSERT INTO %1$s SELECT * FROM data'
195+
, p_relation, v_where_clause, v_limit_clause)
196+
USING p_min, p_max;
197+
198+
GET DIAGNOSTICS p_total = ROW_COUNT;
199+
RETURN;
228200
END
229201
$$
230202
LANGUAGE plpgsql;
231203

204+
/*
205+
* Copy rows to partitions
206+
*/
207+
-- CREATE OR REPLACE FUNCTION @extschema@.partition_data(
208+
-- parent_relid REGCLASS,
209+
-- OUT p_total BIGINT)
210+
-- AS
211+
-- $$
212+
-- DECLARE
213+
-- relname TEXT;
214+
-- rec RECORD;
215+
-- cnt BIGINT := 0;
216+
217+
-- BEGIN
218+
-- p_total := 0;
219+
220+
-- /* Create partitions and copy rest of the data */
221+
-- EXECUTE format('WITH part_data AS (DELETE FROM ONLY %1$s RETURNING *)
222+
-- INSERT INTO %1$s SELECT * FROM part_data',
223+
-- @extschema@.get_schema_qualified_name(parent_relid));
224+
225+
-- /* Get number of inserted rows */
226+
-- GET DIAGNOSTICS p_total = ROW_COUNT;
227+
-- RETURN;
228+
-- END
229+
-- $$
230+
-- LANGUAGE plpgsql;
231+
232232
/*
233233
* Disable pathman partitioning for specified relation
234234
*/

src/pathman.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828

2929

3030
/* Check PostgreSQL version (9.5.4 contains an important fix for BGW) */
31-
#if PG_VERSION_NUM < 90504
32-
#error "Cannot build pg_pathman with PostgreSQL version lower than 9.5.4"
33-
#endif
31+
// #if PG_VERSION_NUM < 90504
32+
// #error "Cannot build pg_pathman with PostgreSQL version lower than 9.5.4"
33+
// #endif
3434

3535
/* Get CString representation of Datum (simple wrapper) */
3636
#ifdef USE_ASSERT_CHECKING

tests/concurrent_partitioning_test.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#coding: utf-8
2+
"""
3+
concurrent_partitioning_test.py
4+
Tests concurrent partitioning worker with simultaneous update queries
5+
6+
Copyright (c) 2015-2016, Postgres Professional
7+
"""
8+
9+
import unittest
10+
from testgres import get_new_node, clean_all, stop_all
11+
from subprocess import Popen, PIPE
12+
import subprocess
13+
import time
14+
15+
16+
class ConcurrentTest(unittest.TestCase):
17+
18+
def setUp(self):
19+
pass
20+
21+
def tearDown(self):
22+
stop_all()
23+
# clean_all()
24+
25+
def test_concurrent(self):
26+
setup_cmd = [
27+
'create extension pg_pathman',
28+
'create table abc(id serial, t text)',
29+
'insert into abc select generate_series(1, 300000)',
30+
'select create_hash_partitions(\'abc\', \'id\', 3, p_partition_data := false)',
31+
]
32+
33+
node = get_new_node('test')
34+
node.init()
35+
node.append_conf('postgresql.conf', 'shared_preload_libraries=\'pg_pathman\'\n')
36+
node.start()
37+
38+
for cmd in setup_cmd:
39+
node.safe_psql('postgres', cmd)
40+
41+
node.psql('postgres', 'select partition_data_worker(\'abc\')')
42+
43+
while True:
44+
# update some rows to check for deadlocks
45+
node.safe_psql('postgres',
46+
'''update abc set t = 'test'
47+
where id in (select (random() * 300000)::int from generate_series(1, 3000))''')
48+
49+
count = node.execute('postgres', 'select count(*) from pathman_active_workers')
50+
51+
# if there is no active workers then it means work is done
52+
if count[0][0] == 0:
53+
break
54+
time.sleep(1)
55+
56+
data = node.execute('postgres', 'select count(*) from only abc')
57+
self.assertEqual(data[0][0], 0)
58+
data = node.execute('postgres', 'select count(*) from abc')
59+
self.assertEqual(data[0][0], 300000)
60+
61+
node.stop()
62+
63+
if __name__ == "__main__":
64+
unittest.main()

0 commit comments

Comments
 (0)