Skip to content

Commit 37b369d

Browse files
committed
injection_points: Add wait and wakeup of processes
This commit adds two features to the in-core module for injection points: - A new callback called "wait" that can be attached to an injection point to make it wait. - A new SQL function to update the shared state and broadcast the update using a condition variable. This function uses an input an injection point name. This offers the possibility to stop a process in flight and wake it up in a controlled manner, which is useful when implementing tests that aim to trigger scenarios for race conditions (some tests are planned for integration). The logic uses a set of counters with a condition variable to monitor and broadcast the changes. Up to 8 waits can be registered in a single run, which should be plenty enough. Waits can be monitored in pg_stat_activity, based on the injection point name which is registered in a custom wait event under the "Extension" category. The shared memory state used by the module is registered using the DSM registry, and is optional, so there is no need to load the module with shared_preload_libraries to be able to use these features. Author: Michael Paquier Reviewed-by: Andrey Borodin, Bertrand Drouvot Discussion: https://postgr.es/m/ZdLuxBk5hGpol91B@paquier.xyz
1 parent 024c521 commit 37b369d

File tree

3 files changed

+166
-0
lines changed

3 files changed

+166
-0
lines changed

src/test/modules/injection_points/injection_points--1.0.sql

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ RETURNS void
2424
AS 'MODULE_PATHNAME', 'injection_points_run'
2525
LANGUAGE C STRICT PARALLEL UNSAFE;
2626

27+
--
28+
-- injection_points_wakeup()
29+
--
30+
-- Wakes up a waiting injection point.
31+
--
32+
CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
33+
RETURNS void
34+
AS 'MODULE_PATHNAME', 'injection_points_wakeup'
35+
LANGUAGE C STRICT PARALLEL UNSAFE;
36+
2737
--
2838
-- injection_points_detach()
2939
--

src/test/modules/injection_points/injection_points.c

+155
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,74 @@
1818
#include "postgres.h"
1919

2020
#include "fmgr.h"
21+
#include "storage/condition_variable.h"
2122
#include "storage/lwlock.h"
2223
#include "storage/shmem.h"
24+
#include "storage/dsm_registry.h"
2325
#include "utils/builtins.h"
2426
#include "utils/injection_point.h"
2527
#include "utils/wait_event.h"
2628

2729
PG_MODULE_MAGIC;
2830

31+
/* Maximum number of waits usable in injection points at once */
32+
#define INJ_MAX_WAIT 8
33+
#define INJ_NAME_MAXLEN 64
34+
35+
/* Shared state information for injection points. */
36+
typedef struct InjectionPointSharedState
37+
{
38+
/* Protects access to other fields */
39+
slock_t lock;
40+
41+
/* Counters advancing when injection_points_wakeup() is called */
42+
uint32 wait_counts[INJ_MAX_WAIT];
43+
44+
/* Names of injection points attached to wait counters */
45+
char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
46+
47+
/* Condition variable used for waits and wakeups */
48+
ConditionVariable wait_point;
49+
} InjectionPointSharedState;
50+
51+
/* Pointer to shared-memory state. */
52+
static InjectionPointSharedState *inj_state = NULL;
53+
2954
extern PGDLLEXPORT void injection_error(const char *name);
3055
extern PGDLLEXPORT void injection_notice(const char *name);
56+
extern PGDLLEXPORT void injection_wait(const char *name);
57+
58+
59+
/*
60+
* Callback for shared memory area initialization.
61+
*/
62+
static void
63+
injection_point_init_state(void *ptr)
64+
{
65+
InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
66+
67+
SpinLockInit(&state->lock);
68+
memset(state->wait_counts, 0, sizeof(state->wait_counts));
69+
memset(state->name, 0, sizeof(state->name));
70+
ConditionVariableInit(&state->wait_point);
71+
}
72+
73+
/*
74+
* Initialize shared memory area for this module.
75+
*/
76+
static void
77+
injection_init_shmem(void)
78+
{
79+
bool found;
3180

81+
if (inj_state != NULL)
82+
return;
83+
84+
inj_state = GetNamedDSMSegment("injection_points",
85+
sizeof(InjectionPointSharedState),
86+
injection_point_init_state,
87+
&found);
88+
}
3289

3390
/* Set of callbacks available to be attached to an injection point. */
3491
void
@@ -43,6 +100,66 @@ injection_notice(const char *name)
43100
elog(NOTICE, "notice triggered for injection point %s", name);
44101
}
45102

103+
/* Wait on a condition variable, awaken by injection_points_wakeup() */
104+
void
105+
injection_wait(const char *name)
106+
{
107+
uint32 old_wait_counts = 0;
108+
int index = -1;
109+
uint32 injection_wait_event = 0;
110+
111+
if (inj_state == NULL)
112+
injection_init_shmem();
113+
114+
/*
115+
* Use the injection point name for this custom wait event. Note that
116+
* this custom wait event name is not released, but we don't care much for
117+
* testing as this should be short-lived.
118+
*/
119+
injection_wait_event = WaitEventExtensionNew(name);
120+
121+
/*
122+
* Find a free slot to wait for, and register this injection point's name.
123+
*/
124+
SpinLockAcquire(&inj_state->lock);
125+
for (int i = 0; i < INJ_MAX_WAIT; i++)
126+
{
127+
if (inj_state->name[i][0] == '\0')
128+
{
129+
index = i;
130+
strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
131+
old_wait_counts = inj_state->wait_counts[i];
132+
break;
133+
}
134+
}
135+
SpinLockRelease(&inj_state->lock);
136+
137+
if (index < 0)
138+
elog(ERROR, "could not find free slot for wait of injection point %s ",
139+
name);
140+
141+
/* And sleep.. */
142+
ConditionVariablePrepareToSleep(&inj_state->wait_point);
143+
for (;;)
144+
{
145+
uint32 new_wait_counts;
146+
147+
SpinLockAcquire(&inj_state->lock);
148+
new_wait_counts = inj_state->wait_counts[index];
149+
SpinLockRelease(&inj_state->lock);
150+
151+
if (old_wait_counts != new_wait_counts)
152+
break;
153+
ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
154+
}
155+
ConditionVariableCancelSleep();
156+
157+
/* Remove this injection point from the waiters. */
158+
SpinLockAcquire(&inj_state->lock);
159+
inj_state->name[index][0] = '\0';
160+
SpinLockRelease(&inj_state->lock);
161+
}
162+
46163
/*
47164
* SQL function for creating an injection point.
48165
*/
@@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
58175
function = "injection_error";
59176
else if (strcmp(action, "notice") == 0)
60177
function = "injection_notice";
178+
else if (strcmp(action, "wait") == 0)
179+
function = "injection_wait";
61180
else
62181
elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
63182

@@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
80199
PG_RETURN_VOID();
81200
}
82201

202+
/*
203+
* SQL function for waking up an injection point waiting in injection_wait().
204+
*/
205+
PG_FUNCTION_INFO_V1(injection_points_wakeup);
206+
Datum
207+
injection_points_wakeup(PG_FUNCTION_ARGS)
208+
{
209+
char *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
210+
int index = -1;
211+
212+
if (inj_state == NULL)
213+
injection_init_shmem();
214+
215+
/* First bump the wait counter for the injection point to wake up */
216+
SpinLockAcquire(&inj_state->lock);
217+
for (int i = 0; i < INJ_MAX_WAIT; i++)
218+
{
219+
if (strcmp(name, inj_state->name[i]) == 0)
220+
{
221+
index = i;
222+
break;
223+
}
224+
}
225+
if (index < 0)
226+
{
227+
SpinLockRelease(&inj_state->lock);
228+
elog(ERROR, "could not find injection point %s to wake up", name);
229+
}
230+
inj_state->wait_counts[index]++;
231+
SpinLockRelease(&inj_state->lock);
232+
233+
/* And broadcast the change to the waiters */
234+
ConditionVariableBroadcast(&inj_state->wait_point);
235+
PG_RETURN_VOID();
236+
}
237+
83238
/*
84239
* SQL function for dropping an injection point.
85240
*/

src/tools/pgindent/typedefs.list

+1
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
12101210
InitializeWorkerForeignScan_function
12111211
InjectionPointCacheEntry
12121212
InjectionPointEntry
1213+
InjectionPointSharedState
12131214
InlineCodeBlock
12141215
InsertStmt
12151216
Instrumentation

0 commit comments

Comments
 (0)