39
39
#include "utils/dsa.h"
40
40
#include "utils/memutils.h"
41
41
#include "utils/snapmgr.h"
42
+ #include "pgstat.h"
42
43
43
44
/*
44
45
* Magic numbers for parallel executor communication. We use constants
51
52
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
52
53
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
53
54
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
55
+ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
54
56
55
57
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
56
58
@@ -368,6 +370,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
368
370
int instrumentation_len = 0 ;
369
371
int instrument_offset = 0 ;
370
372
Size dsa_minsize = dsa_minimum_size ();
373
+ char * query_string ;
374
+ int query_len ;
371
375
372
376
/* Allocate object for return value. */
373
377
pei = palloc0 (sizeof (ParallelExecutorInfo ));
@@ -387,6 +391,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
387
391
* for the various things we need to store.
388
392
*/
389
393
394
+ /* Estimate space for query text. */
395
+ query_len = strlen (estate -> es_sourceText );
396
+ shm_toc_estimate_chunk (& pcxt -> estimator , query_len );
397
+ shm_toc_estimate_keys (& pcxt -> estimator , 1 );
398
+
390
399
/* Estimate space for serialized PlannedStmt. */
391
400
pstmt_len = strlen (pstmt_data ) + 1 ;
392
401
shm_toc_estimate_chunk (& pcxt -> estimator , pstmt_len );
@@ -451,6 +460,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
451
460
* asked for has been allocated or initialized yet, though, so do that.
452
461
*/
453
462
463
+ /* Store query string */
464
+ query_string = shm_toc_allocate (pcxt -> toc , query_len );
465
+ memcpy (query_string , estate -> es_sourceText , query_len );
466
+ shm_toc_insert (pcxt -> toc , PARALLEL_KEY_QUERY_TEXT , query_string );
467
+
454
468
/* Store serialized PlannedStmt. */
455
469
pstmt_space = shm_toc_allocate (pcxt -> toc , pstmt_len );
456
470
memcpy (pstmt_space , pstmt_data , pstmt_len );
@@ -661,6 +675,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
661
675
char * paramspace ;
662
676
PlannedStmt * pstmt ;
663
677
ParamListInfo paramLI ;
678
+ char * queryString ;
679
+
680
+ /* Get the query string from shared memory */
681
+ queryString = shm_toc_lookup (toc , PARALLEL_KEY_QUERY_TEXT );
664
682
665
683
/* Reconstruct leader-supplied PlannedStmt. */
666
684
pstmtspace = shm_toc_lookup (toc , PARALLEL_KEY_PLANNEDSTMT );
@@ -679,7 +697,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
679
697
* revising this someday.
680
698
*/
681
699
return CreateQueryDesc (pstmt ,
682
- "<parallel query>" ,
700
+ queryString ,
683
701
GetActiveSnapshot (), InvalidSnapshot ,
684
702
receiver , paramLI , instrument_options );
685
703
}
@@ -799,6 +817,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
799
817
instrument_options = instrumentation -> instrument_options ;
800
818
queryDesc = ExecParallelGetQueryDesc (toc , receiver , instrument_options );
801
819
820
+ /* Setting debug_query_string for individual workers */
821
+ debug_query_string = queryDesc -> sourceText ;
822
+
823
+ /* Report workers' query for monitoring purposes */
824
+ pgstat_report_activity (STATE_RUNNING , debug_query_string );
825
+
802
826
/* Prepare to track buffer usage during query execution. */
803
827
InstrStartParallelQuery ();
804
828
0 commit comments