Skip to content

Commit eda9922

Browse files
committed
pathman: create partitions on insert trigger
1 parent 035889a commit eda9922

File tree

7 files changed

+178
-23
lines changed

7 files changed

+178
-23
lines changed

contrib/pg_pathman/expected/pg_pathman.out

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -434,13 +434,8 @@ SELECT pathman.drop_range_partitions('test.num_range_rel');
434434
(1 row)
435435

436436
DROP TABLE test.num_range_rel CASCADE;
437-
SELECT pathman.drop_range_partitions('test.range_rel');
438-
drop_range_partitions
439-
-----------------------
440-
441-
(1 row)
442-
443437
DROP TABLE test.range_rel CASCADE;
438+
NOTICE: drop cascades to 6 other objects
444439
SELECT * FROM pathman.pathman_config;
445440
id | relname | attname | parttype
446441
----+---------+---------+----------

contrib/pg_pathman/init.c

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "utils/lsyscache.h"
1212
#include "utils/bytea.h"
1313

14+
#include "utils/snapmgr.h"
1415

1516
HTAB *relations = NULL;
1617
HTAB *range_restrictions = NULL;
@@ -190,6 +191,13 @@ load_check_constraints(Oid parent_oid)
190191
Oid oids[1] = {INT4OID};
191192
bool nulls[1] = {false};
192193
vals[0] = Int32GetDatum(parent_oid);
194+
SPIPlanPtr plan;
195+
196+
// char sql[] = "select pg_constraint.* "
197+
// "from pg_constraint "
198+
// "join pg_inherits on inhrelid = conrelid "
199+
// "where inhparent = %d and contype='c'";
200+
// char *query;
193201

194202
prel = (PartRelationInfo*)
195203
hash_search(relations, (const void *) &parent_oid, HASH_FIND, &found);
@@ -198,11 +206,20 @@ load_check_constraints(Oid parent_oid)
198206
if (prel->children.length > 0)
199207
return;
200208

201-
ret = SPI_execute_with_args("select pg_constraint.* "
202-
"from pg_constraint "
203-
"join pg_inherits on inhrelid = conrelid "
204-
"where inhparent = $1 and contype='c';",
205-
1, oids, vals, nulls, true, 0);
209+
// ret = SPI_execute_with_args("select pg_constraint.* "
210+
// "from pg_constraint "
211+
// "join pg_inherits on inhrelid = conrelid "
212+
// "where inhparent = $1 and contype='c';",
213+
// 1, oids, vals, nulls, true, 0);
214+
215+
plan = SPI_prepare("select pg_constraint.* "
216+
"from pg_constraint "
217+
"join pg_inherits on inhrelid = conrelid "
218+
"where inhparent = $1 and contype='c';",
219+
1, oids);
220+
ret = SPI_execute_snapshot(plan, vals, nulls,
221+
GetCatalogSnapshot(parent_oid), InvalidSnapshot, true, false, 0);
222+
206223
proc = SPI_processed;
207224

208225
if (ret > 0 && SPI_tuptable != NULL)

contrib/pg_pathman/pl_funcs.c

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
#include "utils/typcache.h"
44
#include "utils/array.h"
55
#include "access/nbtree.h"
6+
#include "access/xact.h"
7+
#include "catalog/pg_type.h"
8+
#include "executor/spi.h"
69

710

811
/* declarations */
912
PG_FUNCTION_INFO_V1( on_partitions_created );
1013
PG_FUNCTION_INFO_V1( on_partitions_updated );
1114
PG_FUNCTION_INFO_V1( on_partitions_removed );
1215
PG_FUNCTION_INFO_V1( find_range_partition );
16+
PG_FUNCTION_INFO_V1( find_or_create_range_partition);
1317
PG_FUNCTION_INFO_V1( get_range_by_idx );
1418
PG_FUNCTION_INFO_V1( get_partition_range );
1519

@@ -70,8 +74,52 @@ on_partitions_removed(PG_FUNCTION_ARGS)
7074
/*
7175
* Returns partition oid for specified parent relid and value
7276
*/
77+
// Datum
78+
// find_range_partition(PG_FUNCTION_ARGS)
79+
// {
80+
// int relid = DatumGetInt32(PG_GETARG_DATUM(0));
81+
// Datum value = PG_GETARG_DATUM(1);
82+
// Oid value_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
83+
// int pos;
84+
// bool found;
85+
// RangeRelation *rangerel;
86+
// RangeEntry *ranges;
87+
// TypeCacheEntry *tce;
88+
// PartRelationInfo *prel;
89+
// Oid cmp_proc_oid;
90+
// FmgrInfo cmp_func;
91+
92+
// tce = lookup_type_cache(value_type,
93+
// TYPECACHE_EQ_OPR | TYPECACHE_LT_OPR | TYPECACHE_GT_OPR |
94+
// TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO);
95+
96+
// prel = (PartRelationInfo *)
97+
// hash_search(relations, (const void *) &relid, HASH_FIND, NULL);
98+
99+
// cmp_proc_oid = get_opfamily_proc(tce->btree_opf,
100+
// value_type,
101+
// prel->atttype,
102+
// BTORDER_PROC);
103+
// fmgr_info(cmp_proc_oid, &cmp_func);
104+
105+
// rangerel = (RangeRelation *)
106+
// hash_search(range_restrictions, (const void *) &relid, HASH_FIND, NULL);
107+
108+
// if (!rangerel)
109+
// PG_RETURN_NULL();
110+
111+
// ranges = dsm_array_get_pointer(&rangerel->ranges);
112+
// pos = range_binary_search(rangerel, &cmp_func, value, &found);
113+
114+
// if (found)
115+
// PG_RETURN_OID(ranges[pos].child_oid);
116+
117+
// PG_RETURN_NULL();
118+
// }
119+
73120
Datum
74121
find_range_partition(PG_FUNCTION_ARGS)
122+
// find_or_create_range_partition(PG_FUNCTION_ARGS)
75123
{
76124
int relid = DatumGetInt32(PG_GETARG_DATUM(0));
77125
Datum value = PG_GETARG_DATUM(1);
@@ -101,14 +149,67 @@ find_range_partition(PG_FUNCTION_ARGS)
101149
rangerel = (RangeRelation *)
102150
hash_search(range_restrictions, (const void *) &relid, HASH_FIND, NULL);
103151

104-
if (!rangerel)
152+
if (!rangerel || rangerel->ranges.length == 0)
105153
PG_RETURN_NULL();
106154

107155
ranges = dsm_array_get_pointer(&rangerel->ranges);
108156
pos = range_binary_search(rangerel, &cmp_func, value, &found);
109157

158+
/*
159+
* If found then just return oid. Else create new partitions
160+
*/
110161
if (found)
111162
PG_RETURN_OID(ranges[pos].child_oid);
163+
else
164+
{
165+
int ret;
166+
Datum vals[4];
167+
Oid oids[] = {OIDOID, value_type, value_type, value_type};
168+
bool nulls[] = {false, false, false, false};
169+
RangeEntry *re = &ranges[rangerel->ranges.length-1];
170+
int cmp_upper = FunctionCall2(&cmp_func, value, ranges[rangerel->ranges.length-1].max);
171+
int cmp_lower = FunctionCall2(&cmp_func, value, ranges[0].min);
172+
173+
if (cmp_upper > 0)
174+
re = &ranges[rangerel->ranges.length-1];
175+
else if (cmp_lower < 0)
176+
re = &ranges[0];
177+
178+
vals[0] = ObjectIdGetDatum(relid);
179+
vals[1] = re->min;
180+
vals[2] = re->max;
181+
vals[3] = value;
182+
183+
LWLockAcquire(load_config_lock, LW_EXCLUSIVE);
184+
185+
/* create new partitions */
186+
SPI_connect();
187+
ret = SPI_execute_with_args("select append_partitions_on_demand_internal($1, $2, $3, $4)",
188+
4, oids, vals, nulls, false, 0);
189+
if (ret > 0)
190+
{
191+
free_dsm_array(&rangerel->ranges);
192+
free_dsm_array(&prel->children);
193+
load_check_constraints(relid);
194+
}
195+
else
196+
{
197+
elog(WARNING, "Attempt to create new partitions failed");
198+
}
199+
200+
/* update relation info */
201+
SPI_finish();
202+
203+
LWLockRelease(load_config_lock);
204+
205+
/* repeat binary search */
206+
ranges = dsm_array_get_pointer(&rangerel->ranges);
207+
pos = range_binary_search(rangerel, &cmp_func, value, &found);
208+
if (found)
209+
{
210+
PG_RETURN_OID(ranges[pos].child_oid);
211+
}
212+
}
112213

113214
PG_RETURN_NULL();
114215
}

contrib/pg_pathman/sql/hash.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ BEGIN
1111
relation := @extschema@.validate_relname(relation);
1212

1313
IF EXISTS (SELECT * FROM @extschema@.pathman_config WHERE relname = relation) THEN
14-
RAISE EXCEPTION 'Relation "%s" has already been partitioned', relation;
14+
RAISE EXCEPTION 'Relation "%" has already been partitioned', relation;
1515
END IF;
1616

1717
/* Create partitions and update pg_pathman configuration */

contrib/pg_pathman/sql/init.sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,28 @@ BEGIN
134134
END
135135
$$
136136
LANGUAGE plpgsql;
137+
138+
/*
139+
* DDL trigger that deletes entry from pathman_config
140+
*/
141+
CREATE OR REPLACE FUNCTION @extschema@.pathman_ddl_trigger_func()
142+
RETURNS event_trigger AS
143+
$$
144+
DECLARE
145+
obj record;
146+
BEGIN
147+
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() as events
148+
JOIN @extschema@.pathman_config as cfg ON cfg.relname = events.object_identity
149+
LOOP
150+
IF obj.object_type = 'table' THEN
151+
EXECUTE 'DELETE FROM @extschema@.pathman_config WHERE relname = $1'
152+
USING obj.object_identity;
153+
END IF;
154+
END LOOP;
155+
END
156+
$$
157+
LANGUAGE plpgsql;
158+
159+
CREATE EVENT TRIGGER pathman_ddl_trigger
160+
ON sql_drop
161+
EXECUTE PROCEDURE @extschema@.pathman_ddl_trigger_func();

contrib/pg_pathman/sql/pg_pathman.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ DROP TABLE test.hash_rel CASCADE;
112112
SELECT pathman.drop_range_partitions('test.num_range_rel');
113113
DROP TABLE test.num_range_rel CASCADE;
114114

115-
SELECT pathman.drop_range_partitions('test.range_rel');
116115
DROP TABLE test.range_rel CASCADE;
117116

118117
SELECT * FROM pathman.pathman_config;

contrib/pg_pathman/sql/range.sql

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
/* TODO: function that subdivide specified period with specivied interval */
2+
13
/*
24
* Creates RANGE partitions for specified relation based on datetime attribute
35
*/
@@ -258,6 +260,7 @@ BEGIN
258260
, v_cond);
259261

260262
EXECUTE v_sql;
263+
-- RAISE NOTICE 'partition % created', v_child_relname;
261264
RETURN v_child_relname;
262265
END
263266
$$ LANGUAGE plpgsql;
@@ -708,24 +711,39 @@ $$ LANGUAGE plpgsql;
708711
/*
709712
*
710713
*/
711-
CREATE OR REPLACE FUNCTION create_new_partitions(
714+
CREATE OR REPLACE FUNCTION append_partitions_on_demand_internal(
712715
p_relid OID
713716
, p_min ANYELEMENT
714717
, p_max ANYELEMENT
715718
, p_new_value ANYELEMENT)
716719
RETURNS INTEGER AS
717720
$$
718721
DECLARE
719-
v_cnt INTEGER;
722+
v_cnt INTEGER := 0;
720723
i INTEGER;
724+
v_part TEXT;
721725
BEGIN
722-
v_cnt := (p_new_value - p_max) / (p_max - p_min) + 1;
723-
FOR i IN 0..v_cnt-1
724-
LOOP
725-
PERFORM @extschema@.create_single_range_partition(p_relation
726-
, p_max + (i * (p_max - p_min))
727-
, p_max + ((i+1) * (p_max - p_min)));
728-
END LOOP;
726+
IF p_new_value >= p_max THEN
727+
v_cnt := (p_new_value - p_max) / (p_max - p_min) + 1;
728+
FOR i IN 0..v_cnt-1
729+
LOOP
730+
v_part := create_single_range_partition(get_schema_qualified_name(p_relid::regclass, '.')
731+
, p_max + (i * (p_max - p_min))
732+
, p_max + ((i+1) * (p_max - p_min)));
733+
RAISE NOTICE 'partition % created', v_part;
734+
END LOOP;
735+
ELSIF p_new_value <= p_min THEN
736+
v_cnt := (p_min - p_new_value) / (p_max - p_min) + 1;
737+
FOR i IN 0..v_cnt-1
738+
LOOP
739+
v_part := create_single_range_partition(get_schema_qualified_name(p_relid::regclass, '.')
740+
, p_min - ((i+1) * (p_max - p_min))
741+
, p_min - (i * (p_max - p_min)));
742+
RAISE NOTICE 'partition % created', v_part;
743+
END LOOP;
744+
ELSE
745+
RAISE NOTICE 'Not implemented yet';
746+
END IF;
729747
RETURN v_cnt;
730748
END
731749
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)