Skip to content

Commit d540a02

Browse files
author
Amit Kapila
committed
Display the leader apply worker's PID for parallel apply workers.
Add leader_pid to pg_stat_subscription. leader_pid is the process ID of the leader apply worker if this process is a parallel apply worker. If this field is NULL, it indicates that the process is a leader apply worker or a synchronization worker. The new column makes it easier to distinguish parallel apply workers from other kinds of workers and helps to identify the leader for the parallel workers corresponding to a particular subscription. Additionally, update the leader_pid column in pg_stat_activity as well to display the PID of the leader apply worker for parallel apply workers. Author: Hou Zhijie Reviewed-by: Peter Smith, Sawada Masahiko, Amit Kapila, Shveta Mallik Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
1 parent 14bdb3f commit d540a02

File tree

11 files changed

+105
-43
lines changed

11 files changed

+105
-43
lines changed

doc/src/sgml/logical-replication.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1692,7 +1692,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
16921692
subscription. A disabled subscription or a crashed subscription will have
16931693
zero rows in this view. If the initial data synchronization of any
16941694
table is in progress, there will be additional workers for the tables
1695-
being synchronized.
1695+
being synchronized. Moreover, if the streaming transaction is applied in
1696+
parallel, there may be additional parallel apply workers.
16961697
</para>
16971698
</sect1>
16981699

doc/src/sgml/monitoring.sgml

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -743,9 +743,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
743743
<structfield>leader_pid</structfield> <type>integer</type>
744744
</para>
745745
<para>
746-
Process ID of the parallel group leader, if this process is a
747-
parallel query worker. <literal>NULL</literal> if this process is a
748-
parallel group leader or does not participate in parallel query.
746+
Process ID of the parallel group leader if this process is a parallel
747+
query worker, or process ID of the leader apply worker if this process
748+
is a parallel apply worker. <literal>NULL</literal> indicates that this
749+
process is a parallel group leader or leader apply worker, or does not
750+
participate in any parallel operation.
749751
</para></entry>
750752
</row>
751753

@@ -3206,13 +3208,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32063208
</para></entry>
32073209
</row>
32083210

3211+
<row>
3212+
<entry role="catalog_table_entry"><para role="column_definition">
3213+
<structfield>leader_pid</structfield> <type>integer</type>
3214+
</para>
3215+
<para>
3216+
Process ID of the leader apply worker if this process is a parallel
3217+
apply worker; NULL if this process is a leader apply worker or a
3218+
synchronization worker
3219+
</para></entry>
3220+
</row>
3221+
32093222
<row>
32103223
<entry role="catalog_table_entry"><para role="column_definition">
32113224
<structfield>relid</structfield> <type>oid</type>
32123225
</para>
32133226
<para>
3214-
OID of the relation that the worker is synchronizing; null for the
3215-
main apply worker
3227+
OID of the relation that the worker is synchronizing; NULL for the
3228+
leader apply worker and parallel apply workers
32163229
</para></entry>
32173230
</row>
32183231

@@ -3222,7 +3235,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32223235
</para>
32233236
<para>
32243237
Last write-ahead log location received, the initial value of
3225-
this field being 0
3238+
this field being 0; NULL for parallel apply workers
32263239
</para></entry>
32273240
</row>
32283241

@@ -3231,7 +3244,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32313244
<structfield>last_msg_send_time</structfield> <type>timestamp with time zone</type>
32323245
</para>
32333246
<para>
3234-
Send time of last message received from origin WAL sender
3247+
Send time of last message received from origin WAL sender; NULL for
3248+
parallel apply workers
32353249
</para></entry>
32363250
</row>
32373251

@@ -3240,7 +3254,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32403254
<structfield>last_msg_receipt_time</structfield> <type>timestamp with time zone</type>
32413255
</para>
32423256
<para>
3243-
Receipt time of last message received from origin WAL sender
3257+
Receipt time of last message received from origin WAL sender; NULL for
3258+
parallel apply workers
32443259
</para></entry>
32453260
</row>
32463261

@@ -3249,7 +3264,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32493264
<structfield>latest_end_lsn</structfield> <type>pg_lsn</type>
32503265
</para>
32513266
<para>
3252-
Last write-ahead log location reported to origin WAL sender
3267+
Last write-ahead log location reported to origin WAL sender; NULL for
3268+
parallel apply workers
32533269
</para></entry>
32543270
</row>
32553271

@@ -3259,7 +3275,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
32593275
</para>
32603276
<para>
32613277
Time of last write-ahead log location reported to origin WAL
3262-
sender
3278+
sender; NULL for parallel apply workers
32633279
</para></entry>
32643280
</row>
32653281
</tbody>

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,7 @@ CREATE VIEW pg_stat_subscription AS
948948
su.oid AS subid,
949949
su.subname,
950950
st.pid,
951+
st.leader_pid,
951952
st.relid,
952953
st.received_lsn,
953954
st.last_msg_send_time,

src/backend/replication/logical/applyparallelworker.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
849849
static void
850850
pa_shutdown(int code, Datum arg)
851851
{
852-
SendProcSignal(MyLogicalRepWorker->apply_leader_pid,
852+
SendProcSignal(MyLogicalRepWorker->leader_pid,
853853
PROCSIG_PARALLEL_APPLY_MESSAGE,
854854
InvalidBackendId);
855855

@@ -932,7 +932,7 @@ ParallelApplyWorkerMain(Datum main_arg)
932932
error_mqh = shm_mq_attach(mq, seg, NULL);
933933

934934
pq_redirect_to_shm_mq(seg, error_mqh);
935-
pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
935+
pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
936936
InvalidBackendId);
937937

938938
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
@@ -950,7 +950,7 @@ ParallelApplyWorkerMain(Datum main_arg)
950950
* The parallel apply worker doesn't need to monopolize this replication
951951
* origin which was already acquired by its leader process.
952952
*/
953-
replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid);
953+
replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
954954
replorigin_session_origin = originid;
955955
CommitTransactionCommand();
956956

src/backend/replication/logical/launcher.c

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
410410
worker->relstate = SUBREL_STATE_UNKNOWN;
411411
worker->relstate_lsn = InvalidXLogRecPtr;
412412
worker->stream_fileset = NULL;
413-
worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
413+
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
414414
worker->parallel_apply = is_parallel_apply_worker;
415415
worker->last_lsn = InvalidXLogRecPtr;
416416
TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
732732
worker->userid = InvalidOid;
733733
worker->subid = InvalidOid;
734734
worker->relid = InvalidOid;
735-
worker->apply_leader_pid = InvalidPid;
735+
worker->leader_pid = InvalidPid;
736736
worker->parallel_apply = false;
737737
}
738738

@@ -1066,13 +1066,41 @@ IsLogicalLauncher(void)
10661066
return LogicalRepCtx->launcher_pid == MyProcPid;
10671067
}
10681068

1069+
/*
1070+
* Return the pid of the leader apply worker if the given pid is the pid of a
1071+
* parallel apply worker, otherwise, return InvalidPid.
1072+
*/
1073+
pid_t
1074+
GetLeaderApplyWorkerPid(pid_t pid)
1075+
{
1076+
int leader_pid = InvalidPid;
1077+
int i;
1078+
1079+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1080+
1081+
for (i = 0; i < max_logical_replication_workers; i++)
1082+
{
1083+
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1084+
1085+
if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1086+
{
1087+
leader_pid = w->leader_pid;
1088+
break;
1089+
}
1090+
}
1091+
1092+
LWLockRelease(LogicalRepWorkerLock);
1093+
1094+
return leader_pid;
1095+
}
1096+
10691097
/*
10701098
* Returns state of the subscriptions.
10711099
*/
10721100
Datum
10731101
pg_stat_get_subscription(PG_FUNCTION_ARGS)
10741102
{
1075-
#define PG_STAT_GET_SUBSCRIPTION_COLS 8
1103+
#define PG_STAT_GET_SUBSCRIPTION_COLS 9
10761104
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
10771105
int i;
10781106
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
10981126
if (OidIsValid(subid) && worker.subid != subid)
10991127
continue;
11001128

1101-
/* Skip if this is a parallel apply worker */
1102-
if (isParallelApplyWorker(&worker))
1103-
continue;
1104-
11051129
worker_pid = worker.proc->pid;
11061130

11071131
values[0] = ObjectIdGetDatum(worker.subid);
@@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
11101134
else
11111135
nulls[1] = true;
11121136
values[2] = Int32GetDatum(worker_pid);
1113-
if (XLogRecPtrIsInvalid(worker.last_lsn))
1137+
1138+
if (isParallelApplyWorker(&worker))
1139+
values[3] = Int32GetDatum(worker.leader_pid);
1140+
else
11141141
nulls[3] = true;
1142+
1143+
if (XLogRecPtrIsInvalid(worker.last_lsn))
1144+
nulls[4] = true;
11151145
else
1116-
values[3] = LSNGetDatum(worker.last_lsn);
1146+
values[4] = LSNGetDatum(worker.last_lsn);
11171147
if (worker.last_send_time == 0)
1118-
nulls[4] = true;
1148+
nulls[5] = true;
11191149
else
1120-
values[4] = TimestampTzGetDatum(worker.last_send_time);
1150+
values[5] = TimestampTzGetDatum(worker.last_send_time);
11211151
if (worker.last_recv_time == 0)
1122-
nulls[5] = true;
1152+
nulls[6] = true;
11231153
else
1124-
values[5] = TimestampTzGetDatum(worker.last_recv_time);
1154+
values[6] = TimestampTzGetDatum(worker.last_recv_time);
11251155
if (XLogRecPtrIsInvalid(worker.reply_lsn))
1126-
nulls[6] = true;
1156+
nulls[7] = true;
11271157
else
1128-
values[6] = LSNGetDatum(worker.reply_lsn);
1158+
values[7] = LSNGetDatum(worker.reply_lsn);
11291159
if (worker.reply_time == 0)
1130-
nulls[7] = true;
1160+
nulls[8] = true;
11311161
else
1132-
values[7] = TimestampTzGetDatum(worker.reply_time);
1162+
values[8] = TimestampTzGetDatum(worker.reply_time);
11331163

11341164
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
11351165
values, nulls);

src/backend/utils/adt/pgstatfuncs.c

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "pgstat.h"
2626
#include "postmaster/bgworker_internals.h"
2727
#include "postmaster/postmaster.h"
28+
#include "replication/logicallauncher.h"
2829
#include "storage/proc.h"
2930
#include "storage/procarray.h"
3031
#include "utils/acl.h"
@@ -409,9 +410,9 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
409410

410411
/*
411412
* If a PGPROC entry was retrieved, display wait events and lock
412-
* group leader information if any. To avoid extra overhead, no
413-
* extra lock is being held, so there is no guarantee of
414-
* consistency across multiple rows.
413+
* group leader or apply leader information if any. To avoid
414+
* extra overhead, no extra lock is being held, so there is no
415+
* guarantee of consistency across multiple rows.
415416
*/
416417
if (proc != NULL)
417418
{
@@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
426427

427428
/*
428429
* Show the leader only for active parallel workers. This
429-
* leaves the field as NULL for the leader of a parallel
430-
* group.
430+
* leaves the field as NULL for the leader of a parallel group
431+
* or the leader of parallel apply workers.
431432
*/
432433
if (leader && leader->pid != beentry->st_procpid)
433434
{
434435
values[28] = Int32GetDatum(leader->pid);
435436
nulls[28] = false;
436437
}
438+
else if (beentry->st_backendType == B_BG_WORKER)
439+
{
440+
int leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
441+
442+
if (leader_pid != InvalidPid)
443+
{
444+
values[28] = Int32GetDatum(leader_pid);
445+
nulls[28] = false;
446+
}
447+
}
437448
}
438449

439450
if (wait_event_type)

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/* yyyymmddN */
60-
#define CATALOG_VERSION_NO 202301131
60+
#define CATALOG_VERSION_NO 202301181
6161

6262
#endif

src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5430,9 +5430,9 @@
54305430
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
54315431
proretset => 't', provolatile => 's', proparallel => 'r',
54325432
prorettype => 'record', proargtypes => 'oid',
5433-
proallargtypes => '{oid,oid,oid,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
5434-
proargmodes => '{i,o,o,o,o,o,o,o,o}',
5435-
proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
5433+
proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
5434+
proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
5435+
proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
54365436
prosrc => 'pg_stat_get_subscription' },
54375437
{ oid => '2026', descr => 'statistics: current backend PID',
54385438
proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',

src/include/replication/logicallauncher.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
2727

2828
extern bool IsLogicalLauncher(void);
2929

30+
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
31+
3032
#endif /* LOGICALLAUNCHER_H */

src/include/replication/worker_internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef struct LogicalRepWorker
7171
* PID of leader apply worker if this slot is used for a parallel apply
7272
* worker, InvalidPid otherwise.
7373
*/
74-
pid_t apply_leader_pid;
74+
pid_t leader_pid;
7575

7676
/* Indicates whether apply can be performed in parallel. */
7777
bool parallel_apply;
@@ -303,7 +303,7 @@ extern void pa_decr_and_wait_stream_block(void);
303303
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
304304
XLogRecPtr remote_lsn);
305305

306-
#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
306+
#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
307307

308308
static inline bool
309309
am_tablesync_worker(void)

src/test/regress/expected/rules.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2094,14 +2094,15 @@ pg_stat_ssl| SELECT s.pid,
20942094
pg_stat_subscription| SELECT su.oid AS subid,
20952095
su.subname,
20962096
st.pid,
2097+
st.leader_pid,
20972098
st.relid,
20982099
st.received_lsn,
20992100
st.last_msg_send_time,
21002101
st.last_msg_receipt_time,
21012102
st.latest_end_lsn,
21022103
st.latest_end_time
21032104
FROM (pg_subscription su
2104-
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
2105+
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
21052106
pg_stat_subscription_stats| SELECT ss.subid,
21062107
s.subname,
21072108
ss.apply_error_count,

0 commit comments

Comments
 (0)