Skip to content

Commit 79243de

Browse files
author
Amit Kapila
committed
Restart the apply worker if the privileges have been revoked.
Restart the apply worker if the subscription owner's superuser privileges have been revoked. This is required so that the subscription connection string gets revalidated and use the password option to connect to the publisher for non-superusers, if required. Author: Vignesh C Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/CALDaNm2Dxmhq08nr4P6G+24QvdBo_GAVyZ_Q1TcGYK+8NHs9xw@mail.gmail.com
1 parent 2f04720 commit 79243de

File tree

6 files changed

+60
-11
lines changed

6 files changed

+60
-11
lines changed

src/backend/catalog/pg_subscription.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
108108
Anum_pg_subscription_suborigin);
109109
sub->origin = TextDatumGetCString(datum);
110110

111+
/* Is the subscription owner a superuser? */
112+
sub->ownersuperuser = superuser_arg(sub->owner);
113+
111114
ReleaseSysCache(tup);
112115

113116
return sub;

src/backend/commands/subscriptioncmds.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
869869
load_file("libpqwalreceiver", false);
870870

871871
/* Try to connect to the publisher. */
872-
must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
872+
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
873873
wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
874874
sub->name, &err);
875875
if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
12491249
load_file("libpqwalreceiver", false);
12501250
/* Check the connection info string. */
12511251
walrcv_check_conninfo(stmt->conninfo,
1252-
sub->passwordrequired && !superuser_arg(sub->owner));
1252+
sub->passwordrequired && !sub->ownersuperuser);
12531253

12541254
values[Anum_pg_subscription_subconninfo - 1] =
12551255
CStringGetTextDatum(stmt->conninfo);

src/backend/replication/logical/tablesync.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,13 +1275,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
12751275
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
12761276
MyLogicalRepWorker->relid,
12771277
&relstate_lsn);
1278+
CommitTransactionCommand();
12781279

12791280
/* Is the use of a password mandatory? */
12801281
must_use_password = MySubscription->passwordrequired &&
1281-
!superuser_arg(MySubscription->owner);
1282-
1283-
/* Note that the superuser_arg call can access the DB */
1284-
CommitTransactionCommand();
1282+
!MySubscription->ownersuperuser;
12851283

12861284
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
12871285
MyLogicalRepWorker->relstate = relstate;

src/backend/replication/logical/worker.c

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
39663966
apply_worker_exit();
39673967
}
39683968

3969+
/*
3970+
* Exit if the subscription owner's superuser privileges have been
3971+
* revoked.
3972+
*/
3973+
if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3974+
{
3975+
if (am_parallel_apply_worker())
3976+
ereport(LOG,
3977+
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3978+
MySubscription->name));
3979+
else
3980+
ereport(LOG,
3981+
errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3982+
MySubscription->name));
3983+
3984+
apply_worker_exit();
3985+
}
3986+
39693987
/* Check for other changes that should never happen too. */
39703988
if (newsub->dbid != MySubscription->dbid)
39713989
{
@@ -4492,13 +4510,11 @@ run_apply_worker()
44924510
replorigin_session_setup(originid, 0);
44934511
replorigin_session_origin = originid;
44944512
origin_startpos = replorigin_session_get_progress(false);
4513+
CommitTransactionCommand();
44954514

44964515
/* Is the use of a password mandatory? */
44974516
must_use_password = MySubscription->passwordrequired &&
4498-
!superuser_arg(MySubscription->owner);
4499-
4500-
/* Note that the superuser_arg call can access the DB */
4501-
CommitTransactionCommand();
4517+
!MySubscription->ownersuperuser;
45024518

45034519
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
45044520
must_use_password,
@@ -4621,11 +4637,18 @@ InitializeLogRepWorker(void)
46214637
SetConfigOption("synchronous_commit", MySubscription->synccommit,
46224638
PGC_BACKEND, PGC_S_OVERRIDE);
46234639

4624-
/* Keep us informed about subscription changes. */
4640+
/*
4641+
* Keep us informed about subscription or role changes. Note that the
4642+
* role's superuser privilege can be revoked.
4643+
*/
46254644
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
46264645
subscription_change_cb,
46274646
(Datum) 0);
46284647

4648+
CacheRegisterSyscacheCallback(AUTHOID,
4649+
subscription_change_cb,
4650+
(Datum) 0);
4651+
46294652
if (am_tablesync_worker())
46304653
ereport(LOG,
46314654
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",

src/include/catalog/pg_subscription.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ typedef struct Subscription
127127
* skipped */
128128
char *name; /* Name of the subscription */
129129
Oid owner; /* Oid of the subscription owner */
130+
bool ownersuperuser; /* Is the subscription owner a superuser? */
130131
bool enabled; /* Indicates if the subscription is enabled */
131132
bool binary; /* Indicates if the subscription wants data in
132133
* binary format */

src/test/subscription/t/027_nosuperuser.pl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ sub grant_superuser
104104
CREATE ROLE regress_admin SUPERUSER LOGIN;
105105
CREATE ROLE regress_alice NOSUPERUSER LOGIN;
106106
GRANT CREATE ON DATABASE postgres TO regress_alice;
107+
GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
107108
SET SESSION AUTHORIZATION regress_alice;
108109
CREATE SCHEMA alice;
109110
GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ sub grant_superuser
303304
expect_replication("alice.unpartitioned", 3, 17, 21,
304305
"restoring SELECT permission permits replication to continue");
305306

307+
# The apply worker should get restarted after the superuser privileges are
308+
# revoked for subscription owner alice.
309+
grant_superuser("regress_alice");
310+
$node_subscriber->safe_psql(
311+
'postgres', qq(
312+
SET SESSION AUTHORIZATION regress_alice;
313+
CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
314+
));
315+
316+
# Wait for initial sync to finish
317+
$node_subscriber->wait_for_subscription_sync($node_publisher,
318+
'regression_sub');
319+
320+
# Check the subscriber log from now on.
321+
$offset = -s $node_subscriber->logfile;
322+
323+
revoke_superuser("regress_alice");
324+
325+
# After the user becomes non-superuser the apply worker should be restarted.
326+
$node_subscriber->wait_for_log(
327+
qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
328+
$offset);
329+
306330
done_testing();

0 commit comments

Comments
 (0)