2
2
* collector.c
3
3
* Collector of wait event history and profile.
4
4
*
5
- * Copyright (c) 2015-2016 , Postgres Professional
5
+ * Copyright (c) 2015-2025 , Postgres Professional
6
6
*
7
7
* IDENTIFICATION
8
8
* contrib/pg_wait_sampling/pg_wait_sampling.c
9
9
*/
10
10
#include "postgres.h"
11
11
12
- #include "catalog/pg_type.h"
13
- #if PG_VERSION_NUM >= 130000
14
- #include "common/hashfn.h"
15
- #endif
16
- #include "funcapi.h"
17
12
#include "miscadmin.h"
13
+ #include "pg_wait_sampling.h"
18
14
#include "postmaster/bgworker.h"
19
15
#include "postmaster/interrupt.h"
20
- #include "storage/ipc.h"
21
- #include "storage/procarray.h"
16
+ #include "storage/latch.h"
17
+ #include "storage/lock.h"
18
+ #include "storage/lwlock.h"
19
+ #include "storage/proc.h"
22
20
#include "storage/procsignal.h"
23
21
#include "storage/shm_mq.h"
24
- #include "storage/shm_toc .h"
25
- #include "storage/spin .h"
26
- #include "utils/memutils .h"
27
- #include "utils/resowner.h"
22
+ #include "utils/guc .h"
23
+ #include "utils/hsearch .h"
24
+ #include "utils/timestamp .h"
25
+ #if PG_VERSION_NUM < 140000
28
26
#include "pgstat.h"
27
+ #else
28
+ #include "utils/wait_event.h"
29
+ #endif
29
30
30
- #include "compat.h"
31
- #include "pg_wait_sampling.h"
31
+ static inline shm_mq_result
32
+ shm_mq_send_compat (shm_mq_handle * mqh , Size nbytes , const void * data ,
33
+ bool nowait , bool force_flush )
34
+ {
35
+ #if PG_VERSION_NUM >= 150000
36
+ return shm_mq_send (mqh , nbytes , data , nowait , force_flush );
37
+ #else
38
+ return shm_mq_send (mqh , nbytes , data , nowait );
39
+ #endif
40
+ }
32
41
33
- static volatile sig_atomic_t shutdown_requested = false;
42
+ #if PG_VERSION_NUM < 170000
43
+ #define INIT_PG_LOAD_SESSION_LIBS 0x0001
44
+ #define INIT_PG_OVERRIDE_ALLOW_CONNS 0x0002
45
+ #endif
34
46
35
- static void handle_sigterm (SIGNAL_ARGS );
47
+ static inline void
48
+ InitPostgresCompat (const char * in_dbname , Oid dboid ,
49
+ const char * username , Oid useroid ,
50
+ bits32 flags ,
51
+ char * out_dbname )
52
+ {
53
+ #if PG_VERSION_NUM >= 170000
54
+ InitPostgres (in_dbname , dboid , username , useroid , flags , out_dbname );
55
+ #elif PG_VERSION_NUM >= 150000
56
+ InitPostgres (in_dbname , dboid , username , useroid ,
57
+ flags & INIT_PG_LOAD_SESSION_LIBS ,
58
+ flags & INIT_PG_OVERRIDE_ALLOW_CONNS , out_dbname );
59
+ #else
60
+ InitPostgres (in_dbname , dboid , username , useroid , out_dbname ,
61
+ flags & INIT_PG_OVERRIDE_ALLOW_CONNS );
62
+ #endif
63
+ }
36
64
37
65
/*
38
66
* Register background worker for collecting waits history.
@@ -111,16 +139,6 @@ realloc_history(History *observations, int count)
111
139
observations -> wraparound = false;
112
140
}
113
141
114
- static void
115
- handle_sigterm (SIGNAL_ARGS )
116
- {
117
- int save_errno = errno ;
118
- shutdown_requested = true;
119
- if (MyProc )
120
- SetLatch (& MyProc -> procLatch );
121
- errno = save_errno ;
122
- }
123
-
124
142
/*
125
143
* Get next item of history with rotation.
126
144
*/
@@ -129,6 +147,7 @@ get_next_observation(History *observations)
129
147
{
130
148
HistoryItem * result ;
131
149
150
+ /* Check for wraparound */
132
151
if (observations -> index >= observations -> count )
133
152
{
134
153
observations -> index = 0 ;
@@ -215,6 +234,7 @@ send_history(History *observations, shm_mq_handle *mqh)
215
234
else
216
235
count = observations -> index ;
217
236
237
+ /* Send array size first since receive_array expects this */
218
238
mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
219
239
if (mq_result == SHM_MQ_DETACHED )
220
240
{
@@ -251,6 +271,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
251
271
Size count = hash_get_num_entries (profile_hash );
252
272
shm_mq_result mq_result ;
253
273
274
+ /* Send array size first since receive_array expects this */
254
275
mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
255
276
if (mq_result == SHM_MQ_DETACHED )
256
277
{
@@ -283,32 +304,11 @@ make_profile_hash()
283
304
{
284
305
HASHCTL hash_ctl ;
285
306
286
- hash_ctl .hash = tag_hash ;
287
- hash_ctl .hcxt = TopMemoryContext ;
288
-
289
- if (pgws_profileQueries )
290
- hash_ctl .keysize = offsetof(ProfileItem , count );
291
- else
292
- hash_ctl .keysize = offsetof(ProfileItem , queryId );
293
-
307
+ /* We always include queryId in hash key */
308
+ hash_ctl .keysize = offsetof(ProfileItem , count );
294
309
hash_ctl .entrysize = sizeof (ProfileItem );
295
310
return hash_create ("Waits profile hash" , 1024 , & hash_ctl ,
296
- HASH_FUNCTION | HASH_ELEM );
297
- }
298
-
299
- /*
300
- * Delta between two timestamps in milliseconds.
301
- */
302
- static int64
303
- millisecs_diff (TimestampTz tz1 , TimestampTz tz2 )
304
- {
305
- long secs ;
306
- int microsecs ;
307
-
308
- TimestampDifference (tz1 , tz2 , & secs , & microsecs );
309
-
310
- return secs * 1000 + microsecs / 1000 ;
311
-
311
+ HASH_ELEM | HASH_BLOBS );
312
312
}
313
313
314
314
/*
@@ -319,77 +319,49 @@ pgws_collector_main(Datum main_arg)
319
319
{
320
320
HTAB * profile_hash = NULL ;
321
321
History observations ;
322
- MemoryContext old_context ,
323
- collector_context ;
324
322
TimestampTz current_ts ,
325
323
history_ts ,
326
324
profile_ts ;
327
325
328
- /*
329
- * Establish signal handlers.
330
- *
331
- * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
332
- * it would a normal user backend. To make that happen, we establish a
333
- * signal handler that is a stripped-down version of die(). We don't have
334
- * any equivalent of the backend's command-read loop, where interrupts can
335
- * be processed immediately, so make sure ImmediateInterruptOK is turned
336
- * off.
337
- *
338
- * We also want to respond to the ProcSignal notifications. This is done
339
- * in the upstream provided procsignal_sigusr1_handler, which is
340
- * automatically used if a bgworker connects to a database. But since our
341
- * worker doesn't connect to any database even though it calls
342
- * InitPostgres, which will still initializze a new backend and thus
343
- * partitipate to the ProcSignal infrastructure.
344
- */
345
- pqsignal (SIGTERM , handle_sigterm );
326
+ /* Establish signal handlers */
346
327
pqsignal (SIGHUP , SignalHandlerForConfigReload );
347
328
pqsignal (SIGUSR1 , procsignal_sigusr1_handler );
348
329
BackgroundWorkerUnblockSignals ();
349
330
InitPostgresCompat (NULL , InvalidOid , NULL , InvalidOid , 0 , NULL );
350
331
SetProcessingMode (NormalProcessing );
351
332
352
- /* Make pg_wait_sampling recognisable in pg_stat_activity */
353
- pgstat_report_appname ("pg_wait_sampling collector" );
354
333
355
- profile_hash = make_profile_hash ();
356
334
pgws_collector_hdr -> latch = & MyProc -> procLatch ;
357
335
358
- CurrentResourceOwner = ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
359
- collector_context = AllocSetContextCreate (TopMemoryContext ,
360
- "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
361
- old_context = MemoryContextSwitchTo (collector_context );
362
336
alloc_history (& observations , pgws_historySize );
363
- MemoryContextSwitchTo ( old_context );
337
+ profile_hash = make_profile_hash ( );
364
338
365
- ereport (LOG , ( errmsg ("pg_wait_sampling collector started" ) ));
339
+ ereport (LOG , errmsg ("pg_wait_sampling collector started" ));
366
340
367
341
/* Start counting time for history and profile samples */
368
342
profile_ts = history_ts = GetCurrentTimestamp ();
369
343
370
344
while (1 )
371
345
{
372
- int rc ;
373
346
shm_mq_handle * mqh ;
374
347
int64 history_diff ,
375
348
profile_diff ;
376
349
bool write_history ,
377
350
write_profile ;
378
351
379
- /* We need an explicit call for at least ProcSignal notifications. */
380
- CHECK_FOR_INTERRUPTS ();
352
+ HandleMainLoopInterrupts ();
381
353
382
354
if (ConfigReloadPending )
383
355
{
384
356
ConfigReloadPending = false;
385
357
ProcessConfigFile (PGC_SIGHUP );
386
358
}
387
359
388
- /* Wait calculate time to next sample for history or profile */
360
+ /* Calculate time to next sample for history or profile */
389
361
current_ts = GetCurrentTimestamp ();
390
362
391
- history_diff = millisecs_diff (history_ts , current_ts );
392
- profile_diff = millisecs_diff (profile_ts , current_ts );
363
+ history_diff = TimestampDifferenceMilliseconds (history_ts , current_ts );
364
+ profile_diff = TimestampDifferenceMilliseconds (profile_ts , current_ts );
393
365
394
366
write_history = (history_diff >= (int64 )pgws_historyPeriod );
395
367
write_profile = (profile_diff >= (int64 )pgws_profilePeriod );
@@ -412,20 +384,15 @@ pgws_collector_main(Datum main_arg)
412
384
}
413
385
}
414
386
415
- /* Shutdown if requested */
416
- if (shutdown_requested )
417
- break ;
418
-
419
387
/*
420
- * Wait until next sample time or request to do something through
388
+ * Wait for sample time or until request to do something through
421
389
* shared memory.
422
390
*/
423
- rc = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
424
- Min (pgws_historyPeriod - (int )history_diff ,
425
- pgws_historyPeriod - (int )profile_diff ), PG_WAIT_EXTENSION );
426
-
427
- if (rc & WL_POSTMASTER_DEATH )
428
- proc_exit (1 );
391
+ WaitLatch (& MyProc -> procLatch ,
392
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
393
+ Min (pgws_historyPeriod - (int )history_diff ,
394
+ pgws_profilePeriod - (int )profile_diff ),
395
+ PG_WAIT_EXTENSION );
429
396
430
397
ResetLatch (& MyProc -> procLatch );
431
398
@@ -484,15 +451,4 @@ pgws_collector_main(Datum main_arg)
484
451
LockRelease (& tag , ExclusiveLock , false);
485
452
}
486
453
}
487
-
488
- MemoryContextReset (collector_context );
489
-
490
- /*
491
- * We're done. Explicitly detach the shared memory segment so that we
492
- * don't get a resource leak warning at commit time. This will fire any
493
- * on_dsm_detach callbacks we've registered, as well. Once that's done,
494
- * we can go ahead and exit.
495
- */
496
- ereport (LOG , (errmsg ("pg_wait_sampling collector shutting down" )));
497
- proc_exit (0 );
498
454
}
0 commit comments