Skip to content

Code cleanup #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pg_wait_sampling is released under the PostgreSQL License, a liberal Open Source license, similar to the BSD or MIT licenses.

Copyright (c) 2015-2017, Postgres Professional
Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
Copyright (c) 2015-2025, Postgres Professional
Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
Portions Copyright (c) 1994, The Regents of the University of California

Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.
Expand Down
142 changes: 65 additions & 77 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@
* collector.c
* Collector of wait event history and profile.
*
* Copyright (c) 2015-2016, Postgres Professional
* Copyright (c) 2015-2025, Postgres Professional
*
* IDENTIFICATION
* contrib/pg_wait_sampling/pg_wait_sampling.c
*/
#include "postgres.h"

#include "catalog/pg_type.h"
#if PG_VERSION_NUM >= 130000
#include "common/hashfn.h"
#endif
#include "funcapi.h"
#include <signal.h>

#include "compat.h"
#include "miscadmin.h"
#include "pg_wait_sampling.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
#include "storage/procarray.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "pgstat.h"

#include "compat.h"
#include "pg_wait_sampling.h"
#include "utils/timestamp.h"

static volatile sig_atomic_t shutdown_requested = false;

Expand Down Expand Up @@ -73,10 +73,10 @@ alloc_history(History *observations, int count)
static void
realloc_history(History *observations, int count)
{
HistoryItem *newitems;
int copyCount,
i,
j;
HistoryItem *newitems;
int copyCount,
i,
j;

/* Allocate new array for history */
newitems = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
Expand Down Expand Up @@ -114,7 +114,8 @@ realloc_history(History *observations, int count)
static void
handle_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
int save_errno = errno;

shutdown_requested = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
Expand All @@ -129,6 +130,7 @@ get_next_observation(History *observations)
{
HistoryItem *result;

/* Check for wraparound */
if (observations->index >= observations->count)
{
observations->index = 0;
Expand All @@ -149,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
{
int i,
newSize;
TimestampTz ts = GetCurrentTimestamp();
TimestampTz ts = GetCurrentTimestamp();

/* Realloc waits history if needed */
newSize = pgws_historySize;
Expand All @@ -160,9 +162,9 @@ probe_waits(History *observations, HTAB *profile_hash,
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
HistoryItem item,
*observation;
PGPROC *proc = &ProcGlobal->allProcs[i];
HistoryItem item,
*observation;
PGPROC *proc = &ProcGlobal->allProcs[i];

if (!pgws_should_sample_proc(proc, &item.pid, &item.wait_event_info))
continue;
Expand All @@ -184,8 +186,8 @@ probe_waits(History *observations, HTAB *profile_hash,
/* Write to the profile if needed */
if (write_profile)
{
ProfileItem *profileItem;
bool found;
ProfileItem *profileItem;
bool found;

if (!profile_pid)
item.pid = 0;
Expand All @@ -206,15 +208,16 @@ probe_waits(History *observations, HTAB *profile_hash,
static void
send_history(History *observations, shm_mq_handle *mqh)
{
Size count,
i;
shm_mq_result mq_result;
Size count,
i;
shm_mq_result mq_result;

if (observations->wraparound)
count = observations->count;
else
count = observations->index;

/* Send array size first since receive_array expects this */
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
Expand All @@ -226,10 +229,10 @@ send_history(History *observations, shm_mq_handle *mqh)
for (i = 0; i < count; i++)
{
mq_result = shm_mq_send_compat(mqh,
sizeof(HistoryItem),
&observations->items[i],
false,
true);
sizeof(HistoryItem),
&observations->items[i],
false,
true);
if (mq_result == SHM_MQ_DETACHED)
{
ereport(WARNING,
Expand All @@ -246,11 +249,12 @@ send_history(History *observations, shm_mq_handle *mqh)
static void
send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
{
HASH_SEQ_STATUS scan_status;
ProfileItem *item;
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;
HASH_SEQ_STATUS scan_status;
ProfileItem *item;
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;

/* Send array size first since receive_array expects this */
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
Expand Down Expand Up @@ -281,10 +285,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
static HTAB *
make_profile_hash()
{
HASHCTL hash_ctl;

hash_ctl.hash = tag_hash;
hash_ctl.hcxt = TopMemoryContext;
HASHCTL hash_ctl;

if (pgws_profileQueries)
hash_ctl.keysize = offsetof(ProfileItem, count);
Expand All @@ -293,7 +294,7 @@ make_profile_hash()

hash_ctl.entrysize = sizeof(ProfileItem);
return hash_create("Waits profile hash", 1024, &hash_ctl,
HASH_FUNCTION | HASH_ELEM);
HASH_ELEM | HASH_BLOBS);
}

/*
Expand All @@ -302,8 +303,8 @@ make_profile_hash()
static int64
millisecs_diff(TimestampTz tz1, TimestampTz tz2)
{
long secs;
int microsecs;
long secs;
int microsecs;

TimestampDifference(tz1, tz2, &secs, &microsecs);

Expand All @@ -317,26 +318,19 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
void
pgws_collector_main(Datum main_arg)
{
HTAB *profile_hash = NULL;
History observations;
MemoryContext old_context,
collector_context;
TimestampTz current_ts,
history_ts,
profile_ts;
HTAB *profile_hash = NULL;
History observations;
MemoryContext old_context,
collector_context;
TimestampTz current_ts,
history_ts,
profile_ts;

/*
* Establish signal handlers.
*
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
* it would a normal user backend. To make that happen, we establish a
* signal handler that is a stripped-down version of die(). We don't have
* any equivalent of the backend's command-read loop, where interrupts can
* be processed immediately, so make sure ImmediateInterruptOK is turned
* off.
*
* We also want to respond to the ProcSignal notifications. This is done
* in the upstream provided procsignal_sigusr1_handler, which is
* We want to respond to the ProcSignal notifications. This is done in
* the upstream provided procsignal_sigusr1_handler, which is
* automatically used if a bgworker connects to a database. But since our
* worker doesn't connect to any database even though it calls
* InitPostgres, which will still initializze a new backend and thus
Expand All @@ -357,7 +351,7 @@ pgws_collector_main(Datum main_arg)

CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector");
collector_context = AllocSetContextCreate(TopMemoryContext,
"pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES);
"pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(collector_context);
alloc_history(&observations, pgws_historySize);
MemoryContextSwitchTo(old_context);
Expand All @@ -369,12 +363,12 @@ pgws_collector_main(Datum main_arg)

while (1)
{
int rc;
shm_mq_handle *mqh;
int64 history_diff,
profile_diff;
bool write_history,
write_profile;
int rc;
shm_mq_handle *mqh;
int64 history_diff,
profile_diff;
bool write_history,
write_profile;

/* We need an explicit call for at least ProcSignal notifications. */
CHECK_FOR_INTERRUPTS();
Expand All @@ -385,14 +379,14 @@ pgws_collector_main(Datum main_arg)
ProcessConfigFile(PGC_SIGHUP);
}

/* Wait calculate time to next sample for history or profile */
/* Calculate time to next sample for history or profile */
current_ts = GetCurrentTimestamp();

history_diff = millisecs_diff(history_ts, current_ts);
profile_diff = millisecs_diff(profile_ts, current_ts);

write_history = (history_diff >= (int64)pgws_historyPeriod);
write_profile = (profile_diff >= (int64)pgws_profilePeriod);
write_history = (history_diff >= (int64) pgws_historyPeriod);
write_profile = (profile_diff >= (int64) pgws_profilePeriod);

if (write_history || write_profile)
{
Expand Down Expand Up @@ -421,8 +415,8 @@ pgws_collector_main(Datum main_arg)
* shared memory.
*/
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
Min(pgws_historyPeriod - (int)history_diff,
pgws_historyPeriod - (int)profile_diff), PG_WAIT_EXTENSION);
Min(pgws_historyPeriod - (int) history_diff,
pgws_historyPeriod - (int) profile_diff), PG_WAIT_EXTENSION);

if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
Expand All @@ -443,7 +437,7 @@ pgws_collector_main(Datum main_arg)

if (request == HISTORY_REQUEST || request == PROFILE_REQUEST)
{
shm_mq_result mq_result;
shm_mq_result mq_result;

/* Send history or profile */
shm_mq_set_sender(pgws_collector_mq, MyProc);
Expand Down Expand Up @@ -487,12 +481,6 @@ pgws_collector_main(Datum main_arg)

MemoryContextReset(collector_context);

/*
* We're done. Explicitly detach the shared memory segment so that we
* don't get a resource leak warning at commit time. This will fire any
* on_dsm_detach callbacks we've registered, as well. Once that's done,
* we can go ahead and exit.
*/
ereport(LOG, (errmsg("pg_wait_sampling collector shutting down")));
proc_exit(0);
}
6 changes: 1 addition & 5 deletions compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@
* compat.h
* Definitions for function wrappers compatible between PG versions.
*
* Copyright (c) 2015-2022, Postgres Professional
* Copyright (c) 2015-2025, Postgres Professional
*
* IDENTIFICATION
* contrib/pg_wait_sampling/compat.h
*/
#ifndef __COMPAT_H__
#define __COMPAT_H__

#include "postgres.h"

#include "access/tupdesc.h"
#include "miscadmin.h"
#include "storage/shm_mq.h"
#include "utils/guc_tables.h"

static inline shm_mq_result
shm_mq_send_compat(shm_mq_handle *mqh, Size nbytes, const void *data,
Expand Down
Loading