Skip to content

Commit 4c728f3

Browse files
committed
Pass the source text for a parallel query to the workers.
With this change, you can see the query that a parallel worker is executing in pg_stat_activity, and if the worker crashes you can see what query it was executing when it crashed. Rafia Sabih, reviewed by Kuntal Ghosh and Amit Kapila and slightly revised by me.
1 parent b431692 commit 4c728f3

File tree

4 files changed

+29
-1
lines changed

4 files changed

+29
-1
lines changed

src/backend/executor/execMain.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
190190
estate->es_param_exec_vals = (ParamExecData *)
191191
palloc0(queryDesc->plannedstmt->nParamExec * sizeof(ParamExecData));
192192

193+
estate->es_sourceText = queryDesc->sourceText;
194+
193195
/*
194196
* If non-read-only query, set the command ID to mark output tuples with
195197
*/

src/backend/executor/execParallel.c

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "utils/dsa.h"
4040
#include "utils/memutils.h"
4141
#include "utils/snapmgr.h"
42+
#include "pgstat.h"
4243

4344
/*
4445
* Magic numbers for parallel executor communication. We use constants
@@ -51,6 +52,7 @@
5152
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
5253
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
5354
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
55+
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
5456

5557
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
5658

@@ -368,6 +370,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
368370
int instrumentation_len = 0;
369371
int instrument_offset = 0;
370372
Size dsa_minsize = dsa_minimum_size();
373+
char *query_string;
374+
int query_len;
371375

372376
/* Allocate object for return value. */
373377
pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -387,6 +391,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
387391
* for the various things we need to store.
388392
*/
389393

394+
/* Estimate space for query text. */
395+
query_len = strlen(estate->es_sourceText);
396+
shm_toc_estimate_chunk(&pcxt->estimator, query_len);
397+
shm_toc_estimate_keys(&pcxt->estimator, 1);
398+
390399
/* Estimate space for serialized PlannedStmt. */
391400
pstmt_len = strlen(pstmt_data) + 1;
392401
shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
@@ -451,6 +460,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
451460
* asked for has been allocated or initialized yet, though, so do that.
452461
*/
453462

463+
/* Store query string */
464+
query_string = shm_toc_allocate(pcxt->toc, query_len);
465+
memcpy(query_string, estate->es_sourceText, query_len);
466+
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
467+
454468
/* Store serialized PlannedStmt. */
455469
pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
456470
memcpy(pstmt_space, pstmt_data, pstmt_len);
@@ -661,6 +675,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
661675
char *paramspace;
662676
PlannedStmt *pstmt;
663677
ParamListInfo paramLI;
678+
char *queryString;
679+
680+
/* Get the query string from shared memory */
681+
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
664682

665683
/* Reconstruct leader-supplied PlannedStmt. */
666684
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
@@ -679,7 +697,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
679697
* revising this someday.
680698
*/
681699
return CreateQueryDesc(pstmt,
682-
"<parallel query>",
700+
queryString,
683701
GetActiveSnapshot(), InvalidSnapshot,
684702
receiver, paramLI, instrument_options);
685703
}
@@ -799,6 +817,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
799817
instrument_options = instrumentation->instrument_options;
800818
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
801819

820+
/* Setting debug_query_string for individual workers */
821+
debug_query_string = queryDesc->sourceText;
822+
823+
/* Report workers' query for monitoring purposes */
824+
pgstat_report_activity(STATE_RUNNING, debug_query_string);
825+
802826
/* Prepare to track buffer usage during query execution. */
803827
InstrStartParallelQuery();
804828

src/backend/executor/execUtils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ CreateExecutorState(void)
139139
estate->es_epqTuple = NULL;
140140
estate->es_epqTupleSet = NULL;
141141
estate->es_epqScanDone = NULL;
142+
estate->es_sourceText = NULL;
142143

143144
/*
144145
* Return the executor state structure

src/include/nodes/execnodes.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ typedef struct EState
371371
Snapshot es_crosscheck_snapshot; /* crosscheck time qual for RI */
372372
List *es_range_table; /* List of RangeTblEntry */
373373
PlannedStmt *es_plannedstmt; /* link to top of plan tree */
374+
const char *es_sourceText; /* Source text from QueryDesc */
374375

375376
JunkFilter *es_junkFilter; /* top-level junk filter, if any */
376377

0 commit comments

Comments
 (0)