Skip to content

Commit 9af0630

Browse files
committed
merge concurrent partitioning into picky nodes, first stage
1 parent e373da0 commit 9af0630

File tree

13 files changed

+839
-94
lines changed

13 files changed

+839
-94
lines changed

hash.sql

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
CREATE OR REPLACE FUNCTION @extschema@.create_hash_partitions(
1515
parent_relid REGCLASS,
1616
attribute TEXT,
17-
partitions_count INTEGER
17+
partitions_count INTEGER,
18+
partition_data BOOLEAN DEFAULT true
1819
) RETURNS INTEGER AS
1920
$$
2021
DECLARE
@@ -23,6 +24,7 @@ DECLARE
2324
v_plain_schema TEXT;
2425
v_plain_relname TEXT;
2526
v_hashfunc TEXT;
27+
v_enable_parent BOOLEAN := NOT partition_data;
2628

2729
BEGIN
2830
PERFORM @extschema@.validate_relname(parent_relid);
@@ -66,7 +68,12 @@ BEGIN
6668
PERFORM @extschema@.on_create_partitions(parent_relid);
6769

6870
/* Copy data */
69-
PERFORM @extschema@.partition_data(parent_relid);
71+
IF partition_data = true THEN
72+
PERFORM @extschema@.disable_parent(parent_relid);
73+
PERFORM @extschema@.partition_data(parent_relid);
74+
ELSE
75+
PERFORM @extschema@.enable_parent(parent_relid);
76+
END IF;
7077

7178
RETURN partitions_count;
7279
END

init.sql

Lines changed: 159 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,179 @@ CREATE TABLE IF NOT EXISTS @extschema@.pathman_config (
2424
parttype INTEGER NOT NULL,
2525
range_interval TEXT,
2626

27-
CHECK (parttype >= 1 OR parttype <= 2) /* check for allowed part types */
27+
CHECK (parttype IN (1, 2)) /* check for allowed part types */
2828
);
2929

30+
CREATE TABLE IF NOT EXISTS @extschema@.pathman_config_params (
31+
partrel REGCLASS NOT NULL,
32+
enable_parent BOOLEAN NOT NULL DEFAULT TRUE
33+
);
34+
CREATE UNIQUE INDEX i_pathman_config_params
35+
ON @extschema@.pathman_config_params(partrel);
3036

3137
SELECT pg_catalog.pg_extension_config_dump('@extschema@.pathman_config', '');
38+
SELECT pg_catalog.pg_extension_config_dump('@extschema@.pathman_config_params', '');
39+
40+
41+
CREATE OR REPLACE FUNCTION @extschema@.on_enable_parent(relid OID)
42+
RETURNS OID AS 'pg_pathman' LANGUAGE C STRICT;
43+
44+
CREATE OR REPLACE FUNCTION @extschema@.on_disable_parent(relid OID)
45+
RETURNS OID AS 'pg_pathman' LANGUAGE C STRICT;
46+
47+
/* Include parent relation into query plan's for specified relation */
48+
CREATE OR REPLACE FUNCTION @extschema@.enable_parent(relation REGCLASS)
49+
RETURNS VOID AS
50+
$$
51+
BEGIN
52+
INSERT INTO @extschema@.pathman_config_params values (relation, True)
53+
ON CONFLICT (partrel) DO
54+
UPDATE SET enable_parent = True;
55+
56+
PERFORM @extschema@.on_enable_parent(relation::oid);
57+
END
58+
$$
59+
LANGUAGE plpgsql;
60+
61+
/* Do not include parent relation into query plan's for specified relation */
62+
CREATE OR REPLACE FUNCTION @extschema@.disable_parent(relation REGCLASS)
63+
RETURNS VOID AS
64+
$$
65+
BEGIN
66+
INSERT INTO @extschema@.pathman_config_params values (relation, False)
67+
ON CONFLICT (partrel) DO
68+
UPDATE SET enable_parent = False;
69+
70+
PERFORM @extschema@.on_disable_parent(relation::oid);
71+
END
72+
$$
73+
LANGUAGE plpgsql;
3274

75+
/*
76+
* Partitioning data tools
77+
*/
78+
CREATE OR REPLACE FUNCTION @extschema@.active_workers()
79+
RETURNS TABLE (
80+
pid INT,
81+
dbid INT,
82+
relid INT,
83+
processed INT,
84+
status TEXT
85+
) AS 'pg_pathman' LANGUAGE C STRICT;
86+
87+
CREATE OR REPLACE VIEW @extschema@.pathman_active_workers
88+
AS SELECT * FROM @extschema@.active_workers();
89+
90+
CREATE OR REPLACE FUNCTION @extschema@.partition_data_worker(relation regclass)
91+
RETURNS VOID AS 'pg_pathman' LANGUAGE C STRICT;
92+
93+
CREATE OR REPLACE FUNCTION @extschema@.stop_worker(relation regclass)
94+
RETURNS BOOL AS 'pg_pathman' LANGUAGE C STRICT;
95+
96+
/* PathmanRange type */
97+
CREATE OR REPLACE FUNCTION @extschema@.pathman_range_in(cstring)
98+
RETURNS PathmanRange
99+
AS 'pg_pathman'
100+
LANGUAGE C IMMUTABLE STRICT;
101+
102+
CREATE OR REPLACE FUNCTION @extschema@.pathman_range_out(PathmanRange)
103+
RETURNS cstring
104+
AS 'pg_pathman'
105+
LANGUAGE C IMMUTABLE STRICT;
106+
107+
/*
108+
CREATE OR REPLACE FUNCTION @extschema@.get_whole_range(relid OID)
109+
RETURNS PathmanRange
110+
AS 'pg_pathman'
111+
LANGUAGE C STRICT;
112+
113+
CREATE OR REPLACE FUNCTION @extschema@.range_value_cmp(range PathmanRange, value ANYELEMENT)
114+
RETURNS INTEGER
115+
AS 'pg_pathman'
116+
LANGUAGE C STRICT;
117+
118+
CREATE OR REPLACE FUNCTION @extschema@.range_lower(range PathmanRange, dummy ANYELEMENT)
119+
RETURNS ANYELEMENT
120+
AS 'pg_pathman'
121+
LANGUAGE C;
122+
123+
CREATE OR REPLACE FUNCTION @extschema@.range_upper(range PathmanRange, dummy ANYELEMENT)
124+
RETURNS ANYELEMENT
125+
AS 'pg_pathman'
126+
LANGUAGE C;
127+
128+
CREATE OR REPLACE FUNCTION @extschema@.range_oid(range PathmanRange)
129+
RETURNS OID
130+
AS 'pg_pathman'
131+
LANGUAGE C STRICT;
132+
133+
CREATE OR REPLACE FUNCTION @extschema@.range_partitions_list(parent_relid OID)
134+
RETURNS SETOF PATHMANRANGE AS 'pg_pathman'
135+
LANGUAGE C STRICT;
136+
*/
137+
CREATE TYPE @extschema@.PathmanRange (
138+
internallength = 32,
139+
input = pathman_range_in,
140+
output = pathman_range_out
141+
);
33142

34143
/*
35144
* Copy rows to partitions
36145
*/
37146
CREATE OR REPLACE FUNCTION @extschema@.partition_data(
38-
parent_relid REGCLASS,
39-
OUT p_total BIGINT)
147+
p_relation regclass
148+
, p_min ANYELEMENT DEFAULT NULL::text
149+
, p_max ANYELEMENT DEFAULT NULL::text
150+
, p_limit INT DEFAULT NULL
151+
, OUT p_total BIGINT)
40152
AS
41153
$$
42154
DECLARE
43-
relname TEXT;
44-
rec RECORD;
45-
cnt BIGINT := 0;
46-
155+
v_attr TEXT;
156+
v_limit_clause TEXT := '';
157+
v_where_clause TEXT := '';
47158
BEGIN
48-
p_total := 0;
49-
50-
/* Create partitions and copy rest of the data */
51-
EXECUTE format('WITH part_data AS (DELETE FROM ONLY %1$s RETURNING *)
52-
INSERT INTO %1$s SELECT * FROM part_data',
53-
@extschema@.get_schema_qualified_name(parent_relid));
54-
55-
/* Get number of inserted rows */
56-
GET DIAGNOSTICS p_total = ROW_COUNT;
57-
RETURN;
159+
SELECT attname INTO v_attr
160+
FROM @extschema@.pathman_config WHERE partrel = p_relation;
161+
162+
PERFORM @extschema@.debug_capture();
163+
164+
p_total := 0;
165+
166+
/* Format LIMIT clause if needed */
167+
IF NOT p_limit IS NULL THEN
168+
v_limit_clause := format('LIMIT %s', p_limit);
169+
END IF;
170+
171+
/* Format WHERE clause if needed */
172+
IF NOT p_min IS NULL THEN
173+
v_where_clause := format('%1$s >= $1', v_attr);
174+
END IF;
175+
176+
IF NOT p_max IS NULL THEN
177+
IF NOT p_min IS NULL THEN
178+
v_where_clause := v_where_clause || ' AND ';
179+
END IF;
180+
v_where_clause := v_where_clause || format('%1$s < $2', v_attr);
181+
END IF;
182+
183+
IF v_where_clause != '' THEN
184+
v_where_clause := 'WHERE ' || v_where_clause;
185+
END IF;
186+
187+
/* Lock rows and copy data */
188+
RAISE NOTICE 'Copying data to partitions...';
189+
EXECUTE format('
190+
WITH data AS (
191+
DELETE FROM ONLY %1$s WHERE ctid IN (
192+
SELECT ctid FROM ONLY %1$s %2$s %3$s FOR UPDATE NOWAIT
193+
) RETURNING *)
194+
INSERT INTO %1$s SELECT * FROM data'
195+
, p_relation, v_where_clause, v_limit_clause)
196+
USING p_min, p_max;
197+
198+
GET DIAGNOSTICS p_total = ROW_COUNT;
199+
RETURN;
58200
END
59201
$$
60202
LANGUAGE plpgsql;

sql/pg_pathman.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
2+
--:gdb
3+
14
\set VERBOSITY terse
25

36
CREATE SCHEMA pathman;

src/hooks.c

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,12 @@ pathman_rel_pathlist_hook(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTb
246246
/*
247247
* Expand simple_rte_array and simple_rel_array
248248
*/
249+
len = irange_list_length(ranges);
250+
if (prel->enable_parent)
251+
len++;
249252

250-
if (ranges)
253+
if (len > 0)
251254
{
252-
len = irange_list_length(ranges);
253-
254255
/* Expand simple_rel_array and simple_rte_array */
255256
new_rel_array = (RelOptInfo **)
256257
palloc0((root->simple_rel_array_size + len) * sizeof(RelOptInfo *));
@@ -275,6 +276,10 @@ pathman_rel_pathlist_hook(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTb
275276
root->simple_rte_array = new_rte_array;
276277
}
277278

279+
/* Add parent if needed */
280+
if (prel->enable_parent)
281+
append_child_relation(root, rel, rti, rte, i, rte->relid, NULL);
282+
278283
/*
279284
* Iterate all indexes in rangeset and append corresponding child
280285
* relations.
@@ -448,14 +453,31 @@ pathman_planner_hook(Query *parse, int cursorOptions, ParamListInfo boundParams)
448453
void
449454
pathman_post_parse_analysis_hook(ParseState *pstate, Query *query)
450455
{
456+
elog(DEBUG2, "Called parse hook [%u]", MyProcPid);
457+
451458
/* Invoke original hook if needed */
452459
if (post_parse_analyze_hook_next)
453460
post_parse_analyze_hook_next(pstate, query);
454461

462+
455463
/* Finish delayed invalidation jobs */
456464
if (IsPathmanReady())
457465
finish_delayed_invalidation();
458466

467+
elog(DEBUG2, "post_parse: %d %d %u [%u]",
468+
IsPathmanEnabled(),
469+
initialization_needed,
470+
get_pathman_schema(),
471+
MyProcPid);
472+
473+
/* DEBUG!!!! */
474+
// static int parse_sleep = 10;
475+
// if (IsPathmanEnabled() &&
476+
// initialization_needed &&
477+
// get_pathman_schema() == InvalidOid)
478+
// sleep(parse_sleep);
479+
/* -------------------- */
480+
459481
/* Load config if pg_pathman exists & it's still necessary */
460482
if (IsPathmanEnabled() &&
461483
initialization_needed &&

0 commit comments

Comments
 (0)