@@ -89,7 +89,8 @@ static void SendCurrentUserId(void);
89
89
static void SendBgWorkerPids (void );
90
90
static Oid GetRemoteBackendUserId (PGPROC * proc );
91
91
static List * GetRemoteBackendWorkers (PGPROC * proc );
92
- static List * GetRemoteBackendQueryStates (List * procs ,
92
+ static List * GetRemoteBackendQueryStates (PGPROC * leader ,
93
+ List * pworkers ,
93
94
bool verbose ,
94
95
bool costs ,
95
96
bool timing ,
@@ -533,7 +534,8 @@ pg_query_state(PG_FUNCTION_ARGS)
533
534
534
535
bg_worker_procs = GetRemoteBackendWorkers (proc );
535
536
536
- msgs = GetRemoteBackendQueryStates (lcons (proc , bg_worker_procs ),
537
+ msgs = GetRemoteBackendQueryStates (proc ,
538
+ bg_worker_procs ,
537
539
verbose ,
538
540
costs ,
539
541
timing ,
@@ -855,9 +857,7 @@ GetRemoteBackendWorkers(PGPROC *proc)
855
857
return NIL ;
856
858
857
859
mqh = shm_mq_attach (mq , NULL , NULL );
858
- mq_receive_result = shm_mq_receive_with_timeout (mqh , & msg_len ,
859
- (void * * ) & msg ,
860
- MIN_TIMEOUT );
860
+ mq_receive_result = shm_mq_receive (mqh , & msg_len , (void * * ) & msg , false);
861
861
if (mq_receive_result != SHM_MQ_SUCCESS )
862
862
return NIL ;
863
863
@@ -884,7 +884,8 @@ copy_msg(shm_mq_msg *msg)
884
884
}
885
885
886
886
static List *
887
- GetRemoteBackendQueryStates (List * procs ,
887
+ GetRemoteBackendQueryStates (PGPROC * leader ,
888
+ List * pworkers ,
888
889
bool verbose ,
889
890
bool costs ,
890
891
bool timing ,
@@ -895,6 +896,11 @@ GetRemoteBackendQueryStates(List *procs,
895
896
List * result = NIL ;
896
897
List * alive_procs = NIL ;
897
898
ListCell * iter ;
899
+ int sig_result ;
900
+ shm_mq_handle * mqh ;
901
+ shm_mq_result mq_receive_result ;
902
+ shm_mq_msg * msg ;
903
+ Size len ;
898
904
899
905
Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
900
906
Assert (mq );
@@ -912,10 +918,14 @@ GetRemoteBackendQueryStates(List *procs,
912
918
* send signal `QueryStatePollReason` to all processes and define all alive
913
919
* ones
914
920
*/
915
- foreach (iter , procs )
921
+ sig_result = SendProcSignal (leader -> pid ,
922
+ QueryStatePollReason ,
923
+ leader -> backendId );
924
+ if (sig_result == -1 )
925
+ goto signal_error ;
926
+ foreach (iter , pworkers )
916
927
{
917
928
PGPROC * proc = (PGPROC * ) lfirst (iter );
918
- int sig_result ;
919
929
920
930
sig_result = SendProcSignal (proc -> pid ,
921
931
QueryStatePollReason ,
@@ -930,16 +940,24 @@ GetRemoteBackendQueryStates(List *procs,
930
940
alive_procs = lappend (alive_procs , proc );
931
941
}
932
942
943
+ /* extract query state from leader process */
944
+ mq = shm_mq_create (mq , QUEUE_SIZE );
945
+ shm_mq_set_sender (mq , leader );
946
+ shm_mq_set_receiver (mq , MyProc );
947
+ mqh = shm_mq_attach (mq , NULL , NULL );
948
+ mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & msg , false);
949
+ if (mq_receive_result != SHM_MQ_SUCCESS )
950
+ goto mq_error ;
951
+ Assert (len == msg -> length );
952
+ result = lappend (result , copy_msg (msg ));
953
+ shm_mq_detach (mq );
954
+
933
955
/*
934
- * collect results from all alived processes
956
+ * collect results from all alived parallel workers
935
957
*/
936
958
foreach (iter , alive_procs )
937
959
{
938
960
PGPROC * proc = (PGPROC * ) lfirst (iter );
939
- shm_mq_handle * mqh ;
940
- shm_mq_result mq_receive_result ;
941
- shm_mq_msg * msg ;
942
- Size len ;
943
961
944
962
/* prepare message queue to transfer data */
945
963
mq = shm_mq_create (mq , QUEUE_SIZE );
@@ -953,7 +971,7 @@ GetRemoteBackendQueryStates(List *procs,
953
971
mq_receive_result = shm_mq_receive_with_timeout (mqh ,
954
972
& len ,
955
973
(void * * ) & msg ,
956
- 2 * MIN_TIMEOUT );
974
+ MIN_TIMEOUT );
957
975
if (mq_receive_result != SHM_MQ_SUCCESS )
958
976
/* counterpart is died, not consider it */
959
977
continue ;
@@ -971,4 +989,7 @@ GetRemoteBackendQueryStates(List *procs,
971
989
signal_error :
972
990
ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
973
991
errmsg ("invalid send signal" )));
992
+ mq_error :
993
+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
994
+ errmsg ("error in message queue data transmitting" )));
974
995
}
0 commit comments