Skip to content

Commit 28a7c9f

Browse files
author
Maksim Milyutin
committed
Remove timeout under polling leader backend
1 parent 9f63eb5 commit 28a7c9f

File tree

1 file changed

+35
-14
lines changed

1 file changed

+35
-14
lines changed

contrib/pg_query_state/pg_query_state.c

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ static void SendCurrentUserId(void);
8989
static void SendBgWorkerPids(void);
9090
static Oid GetRemoteBackendUserId(PGPROC *proc);
9191
static List *GetRemoteBackendWorkers(PGPROC *proc);
92-
static List *GetRemoteBackendQueryStates(List *procs,
92+
static List *GetRemoteBackendQueryStates(PGPROC *leader,
93+
List *pworkers,
9394
bool verbose,
9495
bool costs,
9596
bool timing,
@@ -533,7 +534,8 @@ pg_query_state(PG_FUNCTION_ARGS)
533534

534535
bg_worker_procs = GetRemoteBackendWorkers(proc);
535536

536-
msgs = GetRemoteBackendQueryStates(lcons(proc, bg_worker_procs),
537+
msgs = GetRemoteBackendQueryStates(proc,
538+
bg_worker_procs,
537539
verbose,
538540
costs,
539541
timing,
@@ -855,9 +857,7 @@ GetRemoteBackendWorkers(PGPROC *proc)
855857
return NIL;
856858

857859
mqh = shm_mq_attach(mq, NULL, NULL);
858-
mq_receive_result = shm_mq_receive_with_timeout(mqh, &msg_len,
859-
(void **) &msg,
860-
MIN_TIMEOUT);
860+
mq_receive_result = shm_mq_receive(mqh, &msg_len, (void **) &msg, false);
861861
if (mq_receive_result != SHM_MQ_SUCCESS)
862862
return NIL;
863863

@@ -884,7 +884,8 @@ copy_msg(shm_mq_msg *msg)
884884
}
885885

886886
static List *
887-
GetRemoteBackendQueryStates(List *procs,
887+
GetRemoteBackendQueryStates(PGPROC *leader,
888+
List *pworkers,
888889
bool verbose,
889890
bool costs,
890891
bool timing,
@@ -895,6 +896,11 @@ GetRemoteBackendQueryStates(List *procs,
895896
List *result = NIL;
896897
List *alive_procs = NIL;
897898
ListCell *iter;
899+
int sig_result;
900+
shm_mq_handle *mqh;
901+
shm_mq_result mq_receive_result;
902+
shm_mq_msg *msg;
903+
Size len;
898904

899905
Assert(QueryStatePollReason != INVALID_PROCSIGNAL);
900906
Assert(mq);
@@ -912,10 +918,14 @@ GetRemoteBackendQueryStates(List *procs,
912918
* send signal `QueryStatePollReason` to all processes and define all alive
913919
* ones
914920
*/
915-
foreach(iter, procs)
921+
sig_result = SendProcSignal(leader->pid,
922+
QueryStatePollReason,
923+
leader->backendId);
924+
if (sig_result == -1)
925+
goto signal_error;
926+
foreach(iter, pworkers)
916927
{
917928
PGPROC *proc = (PGPROC *) lfirst(iter);
918-
int sig_result;
919929

920930
sig_result = SendProcSignal(proc->pid,
921931
QueryStatePollReason,
@@ -930,16 +940,24 @@ GetRemoteBackendQueryStates(List *procs,
930940
alive_procs = lappend(alive_procs, proc);
931941
}
932942

943+
/* extract query state from leader process */
944+
mq = shm_mq_create(mq, QUEUE_SIZE);
945+
shm_mq_set_sender(mq, leader);
946+
shm_mq_set_receiver(mq, MyProc);
947+
mqh = shm_mq_attach(mq, NULL, NULL);
948+
mq_receive_result = shm_mq_receive(mqh, &len, (void **) &msg, false);
949+
if (mq_receive_result != SHM_MQ_SUCCESS)
950+
goto mq_error;
951+
Assert(len == msg->length);
952+
result = lappend(result, copy_msg(msg));
953+
shm_mq_detach(mq);
954+
933955
/*
934-
* collect results from all alived processes
956+
* collect results from all alived parallel workers
935957
*/
936958
foreach(iter, alive_procs)
937959
{
938960
PGPROC *proc = (PGPROC *) lfirst(iter);
939-
shm_mq_handle *mqh;
940-
shm_mq_result mq_receive_result;
941-
shm_mq_msg *msg;
942-
Size len;
943961

944962
/* prepare message queue to transfer data */
945963
mq = shm_mq_create(mq, QUEUE_SIZE);
@@ -953,7 +971,7 @@ GetRemoteBackendQueryStates(List *procs,
953971
mq_receive_result = shm_mq_receive_with_timeout(mqh,
954972
&len,
955973
(void **) &msg,
956-
2 * MIN_TIMEOUT);
974+
MIN_TIMEOUT);
957975
if (mq_receive_result != SHM_MQ_SUCCESS)
958976
/* counterpart is died, not consider it */
959977
continue;
@@ -971,4 +989,7 @@ GetRemoteBackendQueryStates(List *procs,
971989
signal_error:
972990
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
973991
errmsg("invalid send signal")));
992+
mq_error:
993+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
994+
errmsg("error in message queue data transmitting")));
974995
}

0 commit comments

Comments
 (0)