Skip to content

Commit 966e0c3

Browse files
committed
improve function copy_acl_privileges(): copy ACL for each column of parent
1 parent d124952 commit 966e0c3

File tree

3 files changed

+151
-14
lines changed

3 files changed

+151
-14
lines changed

expected/pathman_permissions.out

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ ERROR: permission denied for parent relation "user1_table"
6868
/* Allow user2 to create partitions */
6969
SET ROLE user1;
7070
GRANT INSERT ON permissions.user1_table TO user2;
71+
GRANT UPDATE(a) ON permissions.user1_table TO user2; /* per-column ACL */
7172
/* Should be able to prepend a partition */
7273
SET ROLE user2;
7374
SELECT prepend_range_partition('permissions.user1_table');
@@ -76,6 +77,24 @@ SELECT prepend_range_partition('permissions.user1_table');
7677
permissions.user1_table_4
7778
(1 row)
7879

80+
SELECT attname, attacl from pg_attribute
81+
WHERE attrelid = (SELECT partition FROM pathman_partition_list
82+
WHERE parent = 'permissions.user1_table'::REGCLASS
83+
ORDER BY range_min::int ASC /* prepend */
84+
LIMIT 1)
85+
ORDER BY attname; /* check ACL for each column */
86+
attname | attacl
87+
----------+-----------------
88+
a | {user2=w/user1}
89+
cmax |
90+
cmin |
91+
ctid |
92+
id |
93+
tableoid |
94+
xmax |
95+
xmin |
96+
(8 rows)
97+
7998
/* Have rights, should be ok (parent's ACL is shared by new children) */
8099
SET ROLE user2;
81100
INSERT INTO permissions.user1_table (id, a) VALUES (35, 0) RETURNING *;
@@ -87,7 +106,7 @@ INSERT INTO permissions.user1_table (id, a) VALUES (35, 0) RETURNING *;
87106
SELECT relname, relacl FROM pg_class
88107
WHERE oid = ANY (SELECT partition FROM pathman_partition_list
89108
WHERE parent = 'permissions.user1_table'::REGCLASS
90-
ORDER BY range_max::int DESC
109+
ORDER BY range_max::int DESC /* append */
91110
LIMIT 3)
92111
ORDER BY relname; /* we also check ACL for "user1_table_2" */
93112
relname | relacl

sql/pathman_permissions.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,25 @@ SELECT prepend_range_partition('permissions.user1_table');
5757
/* Allow user2 to create partitions */
5858
SET ROLE user1;
5959
GRANT INSERT ON permissions.user1_table TO user2;
60+
GRANT UPDATE(a) ON permissions.user1_table TO user2; /* per-column ACL */
6061

6162
/* Should be able to prepend a partition */
6263
SET ROLE user2;
6364
SELECT prepend_range_partition('permissions.user1_table');
65+
SELECT attname, attacl from pg_attribute
66+
WHERE attrelid = (SELECT partition FROM pathman_partition_list
67+
WHERE parent = 'permissions.user1_table'::REGCLASS
68+
ORDER BY range_min::int ASC /* prepend */
69+
LIMIT 1)
70+
ORDER BY attname; /* check ACL for each column */
6471

6572
/* Have rights, should be ok (parent's ACL is shared by new children) */
6673
SET ROLE user2;
6774
INSERT INTO permissions.user1_table (id, a) VALUES (35, 0) RETURNING *;
6875
SELECT relname, relacl FROM pg_class
6976
WHERE oid = ANY (SELECT partition FROM pathman_partition_list
7077
WHERE parent = 'permissions.user1_table'::REGCLASS
71-
ORDER BY range_max::int DESC
78+
ORDER BY range_max::int DESC /* append */
7279
LIMIT 3)
7380
ORDER BY relname; /* we also check ACL for "user1_table_2" */
7481

src/partition_creation.c

Lines changed: 123 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,7 @@ create_single_partition_internal(Oid parent_relid,
669669
List *create_stmts;
670670
ListCell *lc;
671671

672+
/* Current user and security context */
672673
Oid save_userid;
673674
int save_sec_context;
674675
bool need_priv_escalation = !superuser(); /* we might be a SU */
@@ -861,21 +862,29 @@ create_table_using_stmt(CreateStmt *create_stmt, Oid relowner)
861862
static void
862863
copy_acl_privileges(Oid parent_relid, Oid partition_relid)
863864
{
864-
Relation pg_class_rel;
865+
Relation pg_class_rel,
866+
pg_attribute_rel;
865867

866-
TupleDesc pg_class_desc;
868+
TupleDesc pg_class_desc,
869+
pg_attribute_desc;
867870

868871
HeapTuple htup;
869-
870-
ScanKeyData skey;
872+
ScanKeyData skey[2];
871873
SysScanDesc scan;
872874

873875
Datum acl_datum;
874876
bool acl_null;
875877

878+
Snapshot snapshot;
879+
876880
pg_class_rel = heap_open(RelationRelationId, RowExclusiveLock);
881+
pg_attribute_rel = heap_open(AttributeRelationId, RowExclusiveLock);
882+
883+
/* Get most recent snapshot */
884+
snapshot = RegisterSnapshot(GetLatestSnapshot());
877885

878886
pg_class_desc = RelationGetDescr(pg_class_rel);
887+
pg_attribute_desc = RelationGetDescr(pg_attribute_rel);
879888

880889
htup = SearchSysCache1(RELOID, ObjectIdGetDatum(parent_relid));
881890
if (!HeapTupleIsValid(htup))
@@ -886,21 +895,25 @@ copy_acl_privileges(Oid parent_relid, Oid partition_relid)
886895

887896
/* Copy datum if it's not NULL */
888897
if (!acl_null)
889-
acl_datum = datumCopy(acl_datum,
890-
pg_class_desc->attrs[Anum_pg_class_relacl - 1]->attbyval,
891-
pg_class_desc->attrs[Anum_pg_class_relacl - 1]->attlen);
898+
{
899+
Form_pg_attribute acl_column;
900+
901+
acl_column = pg_class_desc->attrs[Anum_pg_class_relacl - 1];
902+
903+
acl_datum = datumCopy(acl_datum, acl_column->attbyval, acl_column->attlen);
904+
}
892905

893906
/* Release 'htup' */
894907
ReleaseSysCache(htup);
895908

896909
/* Search for 'partition_relid' */
897-
ScanKeyInit(&skey,
910+
ScanKeyInit(&skey[0],
898911
ObjectIdAttributeNumber,
899912
BTEqualStrategyNumber, F_OIDEQ,
900913
ObjectIdGetDatum(partition_relid));
901914

902-
scan = systable_beginscan(pg_class_rel, ClassOidIndexId, true,
903-
GetLatestSnapshot(), 1, &skey);
915+
scan = systable_beginscan(pg_class_rel, ClassOidIndexId,
916+
true, snapshot, 1, skey);
904917

905918
/* There should be exactly one tuple (our child) */
906919
if (HeapTupleIsValid(htup = systable_getnext(scan)))
@@ -918,8 +931,7 @@ copy_acl_privileges(Oid parent_relid, Oid partition_relid)
918931
replaces[Anum_pg_class_relacl - 1] = true;
919932

920933
/* Build new tuple with parent's ACL */
921-
htup = heap_modify_tuple(htup, RelationGetDescr(pg_class_rel),
922-
values, nulls, replaces);
934+
htup = heap_modify_tuple(htup, pg_class_desc, values, nulls, replaces);
923935

924936
/* Update child's tuple */
925937
simple_heap_update(pg_class_rel, &iptr, htup);
@@ -930,7 +942,106 @@ copy_acl_privileges(Oid parent_relid, Oid partition_relid)
930942

931943
systable_endscan(scan);
932944

945+
946+
/* Search for 'parent_relid's columns */
947+
ScanKeyInit(&skey[0],
948+
Anum_pg_attribute_attrelid,
949+
BTEqualStrategyNumber, F_OIDEQ,
950+
ObjectIdGetDatum(parent_relid));
951+
952+
/* Consider only user-defined columns (>0) */
953+
ScanKeyInit(&skey[1],
954+
Anum_pg_attribute_attnum,
955+
BTEqualStrategyNumber, F_INT2GT,
956+
Int16GetDatum(InvalidAttrNumber));
957+
958+
scan = systable_beginscan(pg_attribute_rel,
959+
AttributeRelidNumIndexId,
960+
true, snapshot, 2, skey);
961+
962+
/* Go through the list of parent's columns */
963+
while (HeapTupleIsValid(htup = systable_getnext(scan)))
964+
{
965+
ScanKeyData subskey[2];
966+
SysScanDesc subscan;
967+
HeapTuple subhtup;
968+
969+
AttrNumber cur_attnum;
970+
bool cur_attnum_null;
971+
972+
/* Get parent column's ACL */
973+
acl_datum = heap_getattr(htup, Anum_pg_attribute_attacl,
974+
pg_attribute_desc, &acl_null);
975+
976+
/* Copy datum if it's not NULL */
977+
if (!acl_null)
978+
{
979+
Form_pg_attribute acl_column;
980+
981+
acl_column = pg_attribute_desc->attrs[Anum_pg_attribute_attacl - 1];
982+
983+
acl_datum = datumCopy(acl_datum,
984+
acl_column->attbyval,
985+
acl_column->attlen);
986+
}
987+
988+
/* Fetch number of current column */
989+
cur_attnum = DatumGetInt16(heap_getattr(htup, Anum_pg_attribute_attnum,
990+
pg_attribute_desc, &cur_attnum_null));
991+
Assert(cur_attnum_null == false); /* must not be NULL! */
992+
993+
/* Search for 'partition_relid' */
994+
ScanKeyInit(&subskey[0],
995+
Anum_pg_attribute_attrelid,
996+
BTEqualStrategyNumber, F_OIDEQ,
997+
ObjectIdGetDatum(partition_relid));
998+
999+
/* Search for 'partition_relid's columns */
1000+
ScanKeyInit(&subskey[1],
1001+
Anum_pg_attribute_attnum,
1002+
BTEqualStrategyNumber, F_INT2EQ,
1003+
Int16GetDatum(cur_attnum));
1004+
1005+
subscan = systable_beginscan(pg_attribute_rel,
1006+
AttributeRelidNumIndexId,
1007+
true, snapshot, 2, subskey);
1008+
1009+
/* There should be exactly one tuple (our child's column) */
1010+
if (HeapTupleIsValid(subhtup = systable_getnext(subscan)))
1011+
{
1012+
ItemPointerData iptr;
1013+
Datum values[Natts_pg_attribute] = { (Datum) 0 };
1014+
bool nulls[Natts_pg_attribute] = { false };
1015+
bool replaces[Natts_pg_attribute] = { false };
1016+
1017+
/* Copy ItemPointer of this tuple */
1018+
iptr = subhtup->t_self;
1019+
1020+
values[Anum_pg_attribute_attacl - 1] = acl_datum; /* ACL array */
1021+
nulls[Anum_pg_attribute_attacl - 1] = acl_null; /* do we have ACL? */
1022+
replaces[Anum_pg_attribute_attacl - 1] = true;
1023+
1024+
/* Build new tuple with parent's ACL */
1025+
subhtup = heap_modify_tuple(subhtup, pg_attribute_desc,
1026+
values, nulls, replaces);
1027+
1028+
/* Update child's tuple */
1029+
simple_heap_update(pg_attribute_rel, &iptr, subhtup);
1030+
1031+
/* Don't forget to update indexes */
1032+
CatalogUpdateIndexes(pg_attribute_rel, subhtup);
1033+
}
1034+
1035+
systable_endscan(subscan);
1036+
}
1037+
1038+
systable_endscan(scan);
1039+
1040+
/* Don't forget to free snapshot */
1041+
UnregisterSnapshot(snapshot);
1042+
9331043
heap_close(pg_class_rel, RowExclusiveLock);
1044+
heap_close(pg_attribute_rel, RowExclusiveLock);
9341045
}
9351046

9361047
/* Copy foreign keys of parent table */

0 commit comments

Comments
 (0)