Skip to content

Commit ff9618e

Browse files
committed
Fix MAINTAIN privileges for toast tables and partitions.
Commit 60684dd left loose ends when it came to maintaining toast tables or partitions. For toast tables, simply skip the privilege check if the toast table is an indirect target of the maintenance command, because the main table privileges have already been checked. For partitions, allow the maintenance command if the user has the MAINTAIN privilege on the partition or any parent. Also make CLUSTER emit "skipping" messages when the user doesn't have privileges, similar to VACUUM. Author: Nathan Bossart Reported-by: Pavel Luzanov Reviewed-by: Pavel Luzanov, Ted Yu Discussion: https://postgr.es/m/20230113231339.GA2422750@nathanxps13
1 parent ff23b59 commit ff9618e

File tree

16 files changed

+126
-64
lines changed

16 files changed

+126
-64
lines changed

doc/src/sgml/ref/analyze.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ ANALYZE [ VERBOSE ] [ <replaceable class="parameter">table_and_columns</replacea
156156
analyze all tables in their databases, except shared catalogs.
157157
(The restriction for shared catalogs means that a true database-wide
158158
<command>ANALYZE</command> can only be performed by superusers and roles
159-
with privileges of <literal>pg_maintain</literal>.)
159+
with privileges of <literal>pg_maintain</literal>.) If a role has
160+
permission to <command>ANALYZE</command> a partitioned table, it is also
161+
permitted to <command>ANALYZE</command> each of its partitions, regardless
162+
of whether the role has the aforementioned privileges on the partition.
160163
<command>ANALYZE</command> will skip over any tables that the calling user
161164
does not have permission to analyze.
162165
</para>

doc/src/sgml/ref/cluster.sgml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,7 @@ CLUSTER [VERBOSE]
6969
<para>
7070
<command>CLUSTER</command> without any parameter reclusters all the
7171
previously-clustered tables in the current database that the calling user
72-
owns or has the <literal>MAINTAIN</literal> privilege for, or all such tables
73-
if called by a superuser or a role with privileges of the
74-
<link linkend="predefined-roles-table"><literal>pg_maintain</literal></link>
75-
role. This form of <command>CLUSTER</command> cannot be
72+
has privileges for. This form of <command>CLUSTER</command> cannot be
7673
executed inside a transaction block.
7774
</para>
7875

@@ -134,6 +131,18 @@ CLUSTER [VERBOSE]
134131
<refsect1>
135132
<title>Notes</title>
136133

134+
<para>
135+
To cluster a table, one must have the <literal>MAINTAIN</literal> privilege
136+
on the table or be the table's owner, a superuser, or a role with
137+
privileges of the
138+
<link linkend="predefined-roles-table"><literal>pg_maintain</literal></link>
139+
role. If a role has permission to <command>CLUSTER</command> a partitioned
140+
table, it is also permitted to <command>CLUSTER</command> each of its
141+
partitions, regardless of whether the role has the aforementioned
142+
privileges on the partition. <command>CLUSTER</command> will skip over any
143+
tables that the calling user does not have permission to cluster.
144+
</para>
145+
137146
<para>
138147
In cases where you are accessing single rows randomly
139148
within a table, the actual order of the data in the

doc/src/sgml/ref/lock.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,10 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
177177
MODE</literal> (or a less-conflicting mode as described in <xref
178178
linkend="explicit-locking"/>) is permitted. If a user has
179179
<literal>SELECT</literal> privileges on the table, <literal>ACCESS SHARE
180-
MODE</literal> is permitted.
180+
MODE</literal> is permitted. If a role has permission to lock a
181+
partitioned table, it is also permitted to lock each of its partitions,
182+
regardless of whether the role has the aforementioned privileges on the
183+
partition.
181184
</para>
182185

183186
<para>

doc/src/sgml/ref/reindex.sgml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,11 @@ REINDEX [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] { DA
306306
indexes on shared catalogs will be skipped unless the user owns the
307307
catalog (which typically won't be the case), has privileges of the
308308
<literal>pg_maintain</literal> role, or has the <literal>MAINTAIN</literal>
309-
privilege on the catalog. Of course, superusers can always reindex anything.
309+
privilege on the catalog. If a role has permission to
310+
<command>REINDEX</command> a partitioned table, it is also permitted to
311+
<command>REINDEX</command> each of its partitions, regardless of whether the
312+
role has the aforementioned privileges on the partition. Of course,
313+
superusers can always reindex anything.
310314
</para>
311315

312316
<para>

doc/src/sgml/ref/vacuum.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,10 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
401401
vacuum all tables in their databases, except shared catalogs.
402402
(The restriction for shared catalogs means that a true database-wide
403403
<command>VACUUM</command> can only be performed by superusers and roles
404-
with privileges of <literal>pg_maintain</literal>.)
404+
with privileges of <literal>pg_maintain</literal>.) If a role has
405+
permission to <command>VACUUM</command> a partitioned table, it is also
406+
permitted to <command>VACUUM</command> each of its partitions, regardless
407+
of whether the role has the aforementioned privileges on the partition.
405408
<command>VACUUM</command> will skip over any tables that the calling user
406409
does not have permission to vacuum.
407410
</para>

src/backend/commands/cluster.c

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
8080
static List *get_tables_to_cluster(MemoryContext cluster_context);
8181
static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
8282
Oid indexOid);
83+
static bool cluster_is_permitted_for_relation(Oid relid, Oid userid);
8384

8485

8586
/*---------------------------------------------------------------------------
@@ -366,8 +367,7 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
366367
if (recheck)
367368
{
368369
/* Check that the user still has privileges for the relation */
369-
if (!object_ownercheck(RelationRelationId, tableOid, save_userid) &&
370-
pg_class_aclcheck(tableOid, save_userid, ACL_MAINTAIN) != ACLCHECK_OK)
370+
if (!cluster_is_permitted_for_relation(tableOid, save_userid))
371371
{
372372
relation_close(OldHeap, AccessExclusiveLock);
373373
goto out;
@@ -1645,8 +1645,7 @@ get_tables_to_cluster(MemoryContext cluster_context)
16451645

16461646
index = (Form_pg_index) GETSTRUCT(indexTuple);
16471647

1648-
if (!object_ownercheck(RelationRelationId, index->indrelid, GetUserId()) &&
1649-
pg_class_aclcheck(index->indrelid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
1648+
if (!cluster_is_permitted_for_relation(index->indrelid, GetUserId()))
16501649
continue;
16511650

16521651
/* Use a permanent memory context for the result list */
@@ -1694,12 +1693,11 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
16941693
if (get_rel_relkind(indexrelid) != RELKIND_INDEX)
16951694
continue;
16961695

1697-
/* Silently skip partitions which the user has no access to. */
1698-
if (!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
1699-
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
1700-
(!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) ||
1701-
IsSharedRelation(relid)))
1702-
continue;
1696+
/*
1697+
* We already checked that the user has privileges to CLUSTER the
1698+
* partitioned table when we locked it earlier, so there's no need to
1699+
* check the privileges again here.
1700+
*/
17031701

17041702
/* Use a permanent memory context for the result list */
17051703
old_context = MemoryContextSwitchTo(cluster_context);
@@ -1714,3 +1712,20 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
17141712

17151713
return rtcs;
17161714
}
1715+
1716+
/*
1717+
* Return whether userid has privileges to CLUSTER relid. If not, this
1718+
* function emits a WARNING.
1719+
*/
1720+
static bool
1721+
cluster_is_permitted_for_relation(Oid relid, Oid userid)
1722+
{
1723+
if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK ||
1724+
has_partition_ancestor_privs(relid, userid, ACL_MAINTAIN))
1725+
return true;
1726+
1727+
ereport(WARNING,
1728+
(errmsg("permission denied to cluster \"%s\", skipping it",
1729+
get_rel_name(relid))));
1730+
return false;
1731+
}

src/backend/commands/indexcmds.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2796,9 +2796,9 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation,
27962796

27972797
/* Check permissions */
27982798
table_oid = IndexGetRelation(relId, true);
2799-
if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
2800-
OidIsValid(table_oid) &&
2801-
pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
2799+
if (OidIsValid(table_oid) &&
2800+
pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
2801+
!has_partition_ancestor_privs(table_oid, GetUserId(), ACL_MAINTAIN))
28022802
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
28032803
relation->relname);
28042804

@@ -3008,16 +3008,16 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
30083008

30093009
/*
30103010
* The table can be reindexed if the user has been granted MAINTAIN on
3011-
* the table or the user is a superuser, the table owner, or the
3012-
* database/schema owner (but in the latter case, only if it's not a
3013-
* shared relation). object_ownercheck includes the superuser case,
3014-
* and depending on objectKind we already know that the user has
3015-
* permission to run REINDEX on this database or schema per the
3016-
* permission checks at the beginning of this routine.
3011+
* the table or one of its partition ancestors or the user is a
3012+
* superuser, the table owner, or the database/schema owner (but in the
3013+
* latter case, only if it's not a shared relation). pg_class_aclcheck
3014+
* includes the superuser case, and depending on objectKind we already
3015+
* know that the user has permission to run REINDEX on this database or
3016+
* schema per the permission checks at the beginning of this routine.
30173017
*/
30183018
if (classtuple->relisshared &&
3019-
!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
3020-
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
3019+
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
3020+
!has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
30213021
continue;
30223022

30233023
/*

src/backend/commands/lockcmds.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "catalog/namespace.h"
2020
#include "catalog/pg_inherits.h"
2121
#include "commands/lockcmds.h"
22+
#include "commands/tablecmds.h"
2223
#include "miscadmin.h"
2324
#include "nodes/nodeFuncs.h"
2425
#include "parser/parse_clause.h"
@@ -305,5 +306,12 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
305306

306307
aclresult = pg_class_aclcheck(reloid, userid, aclmask);
307308

309+
/*
310+
* If this is a partition, check permissions of its ancestors if needed.
311+
*/
312+
if (aclresult != ACLCHECK_OK &&
313+
has_partition_ancestor_privs(reloid, userid, ACL_MAINTAIN))
314+
aclresult = ACLCHECK_OK;
315+
308316
return aclresult;
309317
}

src/backend/commands/tablecmds.c

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16886,12 +16886,38 @@ RangeVarCallbackMaintainsTable(const RangeVar *relation,
1688616886
errmsg("\"%s\" is not a table or materialized view", relation->relname)));
1688716887

1688816888
/* Check permissions */
16889-
if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
16890-
pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
16889+
if (pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
16890+
!has_partition_ancestor_privs(relId, GetUserId(), ACL_MAINTAIN))
1689116891
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLE,
1689216892
relation->relname);
1689316893
}
1689416894

16895+
/*
16896+
* If relid is a partition, returns whether userid has any of the privileges
16897+
* specified in acl on any of its ancestors. Otherwise, returns false.
16898+
*/
16899+
bool
16900+
has_partition_ancestor_privs(Oid relid, Oid userid, AclMode acl)
16901+
{
16902+
List *ancestors;
16903+
ListCell *lc;
16904+
16905+
if (!get_rel_relispartition(relid))
16906+
return false;
16907+
16908+
ancestors = get_partition_ancestors(relid);
16909+
foreach(lc, ancestors)
16910+
{
16911+
Oid ancestor = lfirst_oid(lc);
16912+
16913+
if (OidIsValid(ancestor) &&
16914+
pg_class_aclcheck(ancestor, userid, acl) == ACLCHECK_OK)
16915+
return true;
16916+
}
16917+
16918+
return false;
16919+
}
16920+
1689516921
/*
1689616922
* Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
1689716923
*/

src/backend/commands/vacuum.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "catalog/pg_namespace.h"
4242
#include "commands/cluster.h"
4343
#include "commands/defrem.h"
44+
#include "commands/tablecmds.h"
4445
#include "commands/vacuum.h"
4546
#include "miscadmin.h"
4647
#include "nodes/makefuncs.h"
@@ -91,7 +92,8 @@ static void vac_truncate_clog(TransactionId frozenXID,
9192
MultiXactId minMulti,
9293
TransactionId lastSaneFrozenXid,
9394
MultiXactId lastSaneMinMulti);
94-
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
95+
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
96+
bool skip_privs);
9597
static double compute_parallel_delay(void);
9698
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
9799
static bool vac_tid_reaped(ItemPointer itemptr, void *state);
@@ -501,7 +503,7 @@ vacuum(List *relations, VacuumParams *params,
501503

502504
if (params->options & VACOPT_VACUUM)
503505
{
504-
if (!vacuum_rel(vrel->oid, vrel->relation, params))
506+
if (!vacuum_rel(vrel->oid, vrel->relation, params, false))
505507
continue;
506508
}
507509

@@ -598,11 +600,13 @@ vacuum_is_permitted_for_relation(Oid relid, Form_pg_class reltuple,
598600
* - the role owns the relation
599601
* - the role owns the current database and the relation is not shared
600602
* - the role has been granted the MAINTAIN privilege on the relation
603+
* - the role has privileges to vacuum/analyze any of the relation's
604+
* partition ancestors
601605
*----------
602606
*/
603-
if (object_ownercheck(RelationRelationId, relid, GetUserId()) ||
604-
(object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
605-
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK)
607+
if ((object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
608+
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK ||
609+
has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
606610
return true;
607611

608612
relname = NameStr(reltuple->relname);
@@ -1828,7 +1832,7 @@ vac_truncate_clog(TransactionId frozenXID,
18281832
* At entry and exit, we are not inside a transaction.
18291833
*/
18301834
static bool
1831-
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
1835+
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, bool skip_privs)
18321836
{
18331837
LOCKMODE lmode;
18341838
Relation rel;
@@ -1915,7 +1919,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
19151919
* happen across multiple transactions where privileges could have changed
19161920
* in-between. Make sure to only generate logs for VACUUM in this case.
19171921
*/
1918-
if (!vacuum_is_permitted_for_relation(RelationGetRelid(rel),
1922+
if (!skip_privs &&
1923+
!vacuum_is_permitted_for_relation(RelationGetRelid(rel),
19191924
rel->rd_rel,
19201925
params->options & VACOPT_VACUUM))
19211926
{
@@ -2089,7 +2094,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
20892094
* totally unimportant for toast relations.
20902095
*/
20912096
if (toast_relid != InvalidOid)
2092-
vacuum_rel(toast_relid, NULL, params);
2097+
vacuum_rel(toast_relid, NULL, params, true);
20932098

20942099
/*
20952100
* Now release the session-level lock on the main table.

src/include/commands/tablecmds.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit,
9898
extern void RangeVarCallbackMaintainsTable(const RangeVar *relation,
9999
Oid relId, Oid oldRelId,
100100
void *arg);
101+
extern bool has_partition_ancestor_privs(Oid relid, Oid userid, AclMode acl);
101102

102103
extern void RangeVarCallbackOwnsRelation(const RangeVar *relation,
103104
Oid relId, Oid oldRelId, void *arg);

src/test/isolation/expected/cluster-conflict-partition.out

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ starting permutation: s1_begin s1_lock_child s2_auth s2_cluster s1_commit s2_res
2222
step s1_begin: BEGIN;
2323
step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
2424
step s2_auth: SET ROLE regress_cluster_part;
25-
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
25+
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
2626
step s1_commit: COMMIT;
27+
step s2_cluster: <... completed>
2728
step s2_reset: RESET ROLE;
2829

2930
starting permutation: s1_begin s2_auth s1_lock_child s2_cluster s1_commit s2_reset
3031
step s1_begin: BEGIN;
3132
step s2_auth: SET ROLE regress_cluster_part;
3233
step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
33-
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
34+
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
3435
step s1_commit: COMMIT;
36+
step s2_cluster: <... completed>
3537
step s2_reset: RESET ROLE;

src/test/isolation/specs/cluster-conflict-partition.spec

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@ step s2_auth { SET ROLE regress_cluster_part; }
2727
step s2_cluster { CLUSTER cluster_part_tab USING cluster_part_ind; }
2828
step s2_reset { RESET ROLE; }
2929

30-
# CLUSTER on the parent waits if locked, passes for all cases.
30+
# CLUSTER waits if locked, passes for all cases.
3131
permutation s1_begin s1_lock_parent s2_auth s2_cluster s1_commit s2_reset
3232
permutation s1_begin s2_auth s1_lock_parent s2_cluster s1_commit s2_reset
33-
34-
# When taking a lock on a partition leaf, CLUSTER on the parent skips
35-
# the leaf, passes for all cases.
3633
permutation s1_begin s1_lock_child s2_auth s2_cluster s1_commit s2_reset
3734
permutation s1_begin s2_auth s1_lock_child s2_cluster s1_commit s2_reset

src/test/regress/expected/cluster.out

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,9 @@ INSERT INTO clstr_3 VALUES (1);
352352
-- this user can only cluster clstr_1 and clstr_3, but the latter
353353
-- has not been clustered
354354
SET SESSION AUTHORIZATION regress_clstr_user;
355+
SET client_min_messages = ERROR; -- order of "skipping" warnings may vary
355356
CLUSTER;
357+
RESET client_min_messages;
356358
SELECT * FROM clstr_1 UNION ALL
357359
SELECT * FROM clstr_2 UNION ALL
358360
SELECT * FROM clstr_3;
@@ -513,7 +515,7 @@ SELECT a.relname, a.relfilenode=b.relfilenode FROM pg_class a
513515
-----------+----------
514516
ptnowner | t
515517
ptnowner1 | f
516-
ptnowner2 | t
518+
ptnowner2 | f
517519
(3 rows)
518520

519521
DROP TABLE ptnowner;

0 commit comments

Comments
 (0)