Skip to content

Commit fcc7ead

Browse files
committed
fixed some bugs and tests
1 parent 55321b2 commit fcc7ead

File tree

7 files changed

+56
-100
lines changed

7 files changed

+56
-100
lines changed

expected/pathman_domains.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ SELECT create_hash_partitions('domains.dom_table', 'val', 5);
115115
(1 row)
116116

117117
SELECT * FROM pathman_partition_list
118-
ORDER BY partition::TEXT;
118+
ORDER BY "partition"::TEXT;
119119
parent | partition | parttype | partattr | range_min | range_max
120120
-------------------+---------------------+----------+----------+-----------+-----------
121121
domains.dom_table | domains.dom_table_0 | 1 | val | |

expected/pathman_permissions.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ SELECT prepend_range_partition('permissions.user1_table');
7878
(1 row)
7979

8080
SELECT attname, attacl from pg_attribute
81-
WHERE attrelid = (SELECT partition FROM pathman_partition_list
81+
WHERE attrelid = (SELECT "partition" FROM pathman_partition_list
8282
WHERE parent = 'permissions.user1_table'::REGCLASS
8383
ORDER BY range_min::int ASC /* prepend */
8484
LIMIT 1)
@@ -104,7 +104,7 @@ INSERT INTO permissions.user1_table (id, a) VALUES (35, 0) RETURNING *;
104104
(1 row)
105105

106106
SELECT relname, relacl FROM pg_class
107-
WHERE oid = ANY (SELECT partition FROM pathman_partition_list
107+
WHERE oid = ANY (SELECT "partition" FROM pathman_partition_list
108108
WHERE parent = 'permissions.user1_table'::REGCLASS
109109
ORDER BY range_max::int DESC /* append */
110110
LIMIT 3)

hash.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ BEGIN
3838
INSERT INTO @extschema@.pathman_config (partrel, attname, parttype)
3939
VALUES (parent_relid, attribute, 1);
4040

41-
IF array_length(relnames) != partitions_count THEN
41+
IF array_length(relnames, 1) != partitions_count THEN
4242
RAISE EXCEPTION 'Partition names array size must be equal the partitions count';
4343
END IF;
4444

45-
IF array_length(tablespaces) != partitions_count THEN
45+
IF array_length(tablespaces, 1) != partitions_count THEN
4646
RAISE EXCEPTION 'Partition tablespaces array size must be equal the partitions count';
4747
END IF;
4848

range.sql

Lines changed: 14 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -596,98 +596,28 @@ LANGUAGE plpgsql;
596596

597597

598598
/*
599-
* Merge two partitions. All data will be copied to the first one. Second
600-
* partition will be destroyed.
601-
*
602-
* NOTE: dummy field is used to pass the element type to the function
603-
* (it is necessary because of pseudo-types used in function).
599+
* Merge multiple partitions. All data will be copied to the first one. The rest
600+
* of partitions will be dropped
604601
*/
605-
CREATE OR REPLACE FUNCTION @extschema@.merge_range_partitions_internal(
606-
parent_relid REGCLASS,
602+
CREATE OR REPLACE FUNCTION @extschema@.merge_range_partitions(
603+
partitions REGCLASS[])
604+
RETURNS VOID AS 'pg_pathman', 'merge_range_partitions'
605+
LANGUAGE C STRICT;
606+
607+
/*
608+
* The special case of merging two partitions
609+
*/
610+
CREATE OR REPLACE FUNCTION @extschema@.merge_range_partitions(
607611
partition1 REGCLASS,
608-
partition2 REGCLASS,
609-
dummy ANYELEMENT,
610-
OUT p_range ANYARRAY)
611-
RETURNS ANYARRAY AS
612+
partition2 REGCLASS)
613+
RETURNS VOID AS
612614
$$
613-
DECLARE
614-
v_attname TEXT;
615-
v_atttype REGTYPE;
616-
v_check_name TEXT;
617-
v_lower_bound dummy%TYPE;
618-
v_upper_bound dummy%TYPE;
619-
620615
BEGIN
621-
SELECT attname FROM @extschema@.pathman_config
622-
WHERE partrel = parent_relid
623-
INTO v_attname;
624-
625-
IF v_attname IS NULL THEN
626-
RAISE EXCEPTION 'table "%" is not partitioned', parent_relid::TEXT;
627-
END IF;
628-
629-
v_atttype = @extschema@.get_attribute_type(parent_relid, v_attname);
630-
631-
/* We have to pass fake NULL casted to column's type */
632-
EXECUTE format('SELECT @extschema@.get_part_range($1, NULL::%1$s) ||
633-
@extschema@.get_part_range($2, NULL::%1$s)',
634-
@extschema@.get_base_type(v_atttype)::TEXT)
635-
USING partition1, partition2
636-
INTO p_range;
637-
638-
/* Check if ranges are adjacent */
639-
IF p_range[1] != p_range[4] AND p_range[2] != p_range[3] THEN
640-
RAISE EXCEPTION 'merge failed, partitions must be adjacent';
641-
END IF;
642-
643-
/* Drop constraint on first partition... */
644-
v_check_name := @extschema@.build_check_constraint_name(partition1, v_attname);
645-
EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %s',
646-
partition1::TEXT,
647-
v_check_name);
648-
649-
/* Determine left bound */
650-
IF p_range[1] IS NULL OR p_range[3] IS NULL THEN
651-
v_lower_bound := NULL;
652-
ELSE
653-
v_lower_bound := least(p_range[1], p_range[3]);
654-
END IF;
655-
656-
/* Determine right bound */
657-
IF p_range[2] IS NULL OR p_range[4] IS NULL THEN
658-
v_upper_bound := NULL;
659-
ELSE
660-
v_upper_bound := greatest(p_range[2], p_range[4]);
661-
END IF;
662-
663-
/* and create a new one */
664-
EXECUTE format('ALTER TABLE %s ADD CONSTRAINT %s CHECK (%s)',
665-
partition1::TEXT,
666-
v_check_name,
667-
@extschema@.build_range_condition(partition1,
668-
v_attname,
669-
v_lower_bound,
670-
v_upper_bound));
671-
672-
/* Copy data from second partition to the first one */
673-
EXECUTE format('WITH part_data AS (DELETE FROM %s RETURNING *)
674-
INSERT INTO %s SELECT * FROM part_data',
675-
partition2::TEXT,
676-
partition1::TEXT);
677-
678-
/* Remove second partition */
679-
EXECUTE format('DROP TABLE %s', partition2::TEXT);
616+
PERFORM @extschema@.merge_range_partitions(array[partition1, partition2]::regclass[]);
680617
END
681618
$$ LANGUAGE plpgsql;
682619

683620

684-
CREATE OR REPLACE FUNCTION @extschema@.merge_range_partitions(
685-
parent REGCLASS,
686-
partitions REGCLASS[])
687-
RETURNS VOID AS 'pg_pathman', 'merge_range_partitions'
688-
LANGUAGE C STRICT;
689-
690-
691621
/*
692622
* Append new partition.
693623
*/

sql/pathman_domains.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ SELECT drop_partitions('domains.dom_table');
3737
SELECT create_hash_partitions('domains.dom_table', 'val', 5);
3838

3939
SELECT * FROM pathman_partition_list
40-
ORDER BY partition::TEXT;
40+
ORDER BY "partition"::TEXT;
4141

4242

4343
DROP SCHEMA domains CASCADE;

sql/pathman_permissions.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ GRANT UPDATE(a) ON permissions.user1_table TO user2; /* per-column ACL */
6363
SET ROLE user2;
6464
SELECT prepend_range_partition('permissions.user1_table');
6565
SELECT attname, attacl from pg_attribute
66-
WHERE attrelid = (SELECT partition FROM pathman_partition_list
66+
WHERE attrelid = (SELECT "partition" FROM pathman_partition_list
6767
WHERE parent = 'permissions.user1_table'::REGCLASS
6868
ORDER BY range_min::int ASC /* prepend */
6969
LIMIT 1)
@@ -73,7 +73,7 @@ ORDER BY attname; /* check ACL for each column */
7373
SET ROLE user2;
7474
INSERT INTO permissions.user1_table (id, a) VALUES (35, 0) RETURNING *;
7575
SELECT relname, relacl FROM pg_class
76-
WHERE oid = ANY (SELECT partition FROM pathman_partition_list
76+
WHERE oid = ANY (SELECT "partition" FROM pathman_partition_list
7777
WHERE parent = 'permissions.user1_table'::REGCLASS
7878
ORDER BY range_max::int DESC /* append */
7979
LIMIT 3)

src/pl_range_funcs.c

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ static void recreate_range_constraint(Oid partition,
4343
Oid atttype,
4444
const Bound *lower,
4545
const Bound *upper);
46+
static char *get_qualified_rel_name(Oid relid);
4647

4748
/* Function declarations */
4849

@@ -508,6 +509,7 @@ merge_range_partitions_internal(Oid parent, Oid *partitions, uint32 npart)
508509
List *plist = NIL;
509510
RangeEntry *first, *last;
510511
const PartRelationInfo *prel;
512+
FmgrInfo finfo;
511513

512514
prel = get_pathman_relation_info(parent);
513515
shout_if_prel_is_invalid(parent, prel, PT_RANGE);
@@ -536,10 +538,21 @@ merge_range_partitions_internal(Oid parent, Oid *partitions, uint32 npart)
536538

537539
check_adjacence(prel->cmp_proc, plist);
538540

539-
/* Create a new one */
541+
/* Create a new constraint. To do this first determine the bounds */
540542
first = (RangeEntry *) linitial(plist);
541543
last = (RangeEntry *) llast(plist);
542-
recreate_range_constraint(first->child_oid,
544+
545+
/* If last range is less than first one then swap them */
546+
fmgr_info(prel->cmp_proc, &finfo);
547+
if (cmp_bounds(&finfo, &last->min, &first->min) < 0)
548+
{
549+
RangeEntry *tmp = last;
550+
last = first;
551+
first = tmp;
552+
}
553+
554+
/* Drop old constraint and create a new one */
555+
recreate_range_constraint(partitions[0],
543556
get_relid_attribute_name(prel->key, prel->attnum),
544557
prel->attnum,
545558
prel->atttype,
@@ -557,8 +570,8 @@ merge_range_partitions_internal(Oid parent, Oid *partitions, uint32 npart)
557570
{
558571
char *query = psprintf("WITH part_data AS (DELETE FROM %s RETURNING *) "
559572
"INSERT INTO %s SELECT * FROM part_data",
560-
get_rel_name(partitions[i]),
561-
get_rel_name(partitions[0]));
573+
get_qualified_rel_name(partitions[i]),
574+
get_qualified_rel_name(partitions[0]));
562575

563576
SPI_exec(query, 0);
564577
}
@@ -571,7 +584,7 @@ merge_range_partitions_internal(Oid parent, Oid *partitions, uint32 npart)
571584
for (i = 1; i < npart; i++)
572585
{
573586
char *query = psprintf("DROP TABLE %s",
574-
get_rel_name(partitions[i]));
587+
get_qualified_rel_name(partitions[i]));
575588

576589
SPI_exec(query, 0);
577590
}
@@ -603,10 +616,10 @@ check_adjacence(Oid cmp_proc, List *ranges)
603616
}
604617

605618
/*
606-
* Compare upper bound of previous range entry and lower bound
607-
* of current
619+
* Check that last and current partitions are adjacent
608620
*/
609-
if (cmp_bounds(&finfo, &last->max, &cur->min) != 0)
621+
if ((cmp_bounds(&finfo, &last->max, &cur->min) != 0)
622+
&& (cmp_bounds(&finfo, &cur->max, &last->min) != 0))
610623
elog(ERROR,
611624
"Partitions '%s' and '%s' aren't adjacent",
612625
get_rel_name(last->child_oid), get_rel_name(cur->child_oid));
@@ -650,3 +663,16 @@ recreate_range_constraint(Oid partition,
650663

651664
pfree(attname_nonconst);
652665
}
666+
667+
/*
668+
* Return palloced fully qualified relation name as a cstring
669+
*/
670+
static char *
671+
get_qualified_rel_name(Oid relid)
672+
{
673+
Oid namespace = get_rel_namespace(relid);
674+
675+
return psprintf("%s.%s",
676+
quote_identifier(get_namespace_name(namespace)),
677+
quote_identifier(get_rel_name(relid)));
678+
}

0 commit comments

Comments
 (0)