Skip to content

Commit 319e9e5

Browse files
committed
Add tests for libpq query cancellation APIs
This is in preparation of making changes and additions to these APIs. Author: Jelte Fennema-Nio <postgres@jeltef.nl> Discussion: https://postgr.es/m/CAGECzQRb21spiiykQ48rzz8w+Hcykz+mB2_hxR65D9Qk6nnw=w@mail.gmail.com
1 parent 24c928a commit 319e9e5

File tree

1 file changed

+173
-2
lines changed

1 file changed

+173
-2
lines changed

src/test/modules/libpq_pipeline/libpq_pipeline.c

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ exit_nicely(PGconn *conn)
6464
exit(1);
6565
}
6666

67+
/*
68+
* The following few functions are wrapped in macros to make the reported line
69+
* number in an error match the line number of the invocation.
70+
*/
71+
6772
/*
6873
* Print an error to stderr and terminate the program.
6974
*/
@@ -74,7 +79,6 @@ pg_fatal_impl(int line, const char *fmt,...)
7479
{
7580
va_list args;
7681

77-
7882
fflush(stdout);
7983

8084
fprintf(stderr, "\n%s:%d: ", progname, line);
@@ -86,6 +90,170 @@ pg_fatal_impl(int line, const char *fmt,...)
8690
exit(1);
8791
}
8892

93+
/*
94+
* Check that the query on the given connection got canceled.
95+
*/
96+
#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
97+
static void
98+
confirm_query_canceled_impl(int line, PGconn *conn)
99+
{
100+
PGresult *res = NULL;
101+
102+
res = PQgetResult(conn);
103+
if (res == NULL)
104+
pg_fatal_impl(line, "PQgetResult returned null: %s",
105+
PQerrorMessage(conn));
106+
if (PQresultStatus(res) != PGRES_FATAL_ERROR)
107+
pg_fatal_impl(line, "query did not fail when it was expected");
108+
if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
109+
pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
110+
PQerrorMessage(conn));
111+
PQclear(res);
112+
113+
while (PQisBusy(conn))
114+
PQconsumeInput(conn);
115+
}
116+
117+
#define send_cancellable_query(conn, monitorConn) \
118+
send_cancellable_query_impl(__LINE__, conn, monitorConn)
119+
static void
120+
send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
121+
{
122+
const char *env_wait;
123+
const Oid paramTypes[1] = {INT4OID};
124+
int procpid = PQbackendPID(conn);
125+
126+
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
127+
if (env_wait == NULL)
128+
env_wait = "180";
129+
130+
if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
131+
&env_wait, NULL, NULL, 0) != 1)
132+
pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
133+
134+
/*
135+
* Wait until the query is actually running. Otherwise sending a
136+
* cancellation request might not cancel the query due to race conditions.
137+
*/
138+
while (true)
139+
{
140+
char *value;
141+
PGresult *res;
142+
const char *paramValues[1];
143+
char pidval[16];
144+
145+
snprintf(pidval, 16, "%d", procpid);
146+
paramValues[0] = pidval;
147+
148+
res = PQexecParams(monitorConn,
149+
"SELECT count(*) FROM pg_stat_activity WHERE "
150+
"pid = $1 AND state = 'active'",
151+
1, NULL, paramValues, NULL, NULL, 1);
152+
153+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
154+
pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
155+
if (PQntuples(res) != 1)
156+
pg_fatal("unexpected number of rows received: %d", PQntuples(res));
157+
if (PQnfields(res) != 1)
158+
pg_fatal("unexpected number of columns received: %d", PQnfields(res));
159+
value = PQgetvalue(res, 0, 0);
160+
if (*value != '0')
161+
{
162+
PQclear(res);
163+
break;
164+
}
165+
PQclear(res);
166+
167+
/* wait 10ms before polling again */
168+
pg_usleep(10000);
169+
}
170+
}
171+
172+
/*
173+
* Create a new connection with the same conninfo as the given one.
174+
*/
175+
static PGconn *
176+
copy_connection(PGconn *conn)
177+
{
178+
PGconn *copyConn;
179+
PQconninfoOption *opts = PQconninfo(conn);
180+
const char **keywords;
181+
const char **vals;
182+
int nopts = 1;
183+
int i = 0;
184+
185+
for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
186+
nopts++;
187+
188+
keywords = pg_malloc(sizeof(char *) * nopts);
189+
vals = pg_malloc(sizeof(char *) * nopts);
190+
191+
for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
192+
{
193+
if (opt->val)
194+
{
195+
keywords[i] = opt->keyword;
196+
vals[i] = opt->val;
197+
i++;
198+
}
199+
}
200+
keywords[i] = vals[i] = NULL;
201+
202+
copyConn = PQconnectdbParams(keywords, vals, false);
203+
204+
if (PQstatus(copyConn) != CONNECTION_OK)
205+
pg_fatal("Connection to database failed: %s",
206+
PQerrorMessage(copyConn));
207+
208+
return copyConn;
209+
}
210+
211+
/*
212+
* Test query cancellation routines
213+
*/
214+
static void
215+
test_cancel(PGconn *conn)
216+
{
217+
PGcancel *cancel;
218+
PGconn *monitorConn;
219+
char errorbuf[256];
220+
221+
fprintf(stderr, "test cancellations... ");
222+
223+
if (PQsetnonblocking(conn, 1) != 0)
224+
pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
225+
226+
/*
227+
* Make a separate connection to the database to monitor the query on the
228+
* main connection.
229+
*/
230+
monitorConn = copy_connection(conn);
231+
Assert(PQstatus(monitorConn) == CONNECTION_OK);
232+
233+
/* test PQcancel */
234+
send_cancellable_query(conn, monitorConn);
235+
cancel = PQgetCancel(conn);
236+
if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
237+
pg_fatal("failed to run PQcancel: %s", errorbuf);
238+
confirm_query_canceled(conn);
239+
240+
/* PGcancel object can be reused for the next query */
241+
send_cancellable_query(conn, monitorConn);
242+
if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
243+
pg_fatal("failed to run PQcancel: %s", errorbuf);
244+
confirm_query_canceled(conn);
245+
246+
PQfreeCancel(cancel);
247+
248+
/* test PQrequestCancel */
249+
send_cancellable_query(conn, monitorConn);
250+
if (!PQrequestCancel(conn))
251+
pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
252+
confirm_query_canceled(conn);
253+
254+
fprintf(stderr, "ok\n");
255+
}
256+
89257
static void
90258
test_disallowed_in_pipeline(PGconn *conn)
91259
{
@@ -1789,6 +1957,7 @@ usage(const char *progname)
17891957
static void
17901958
print_test_list(void)
17911959
{
1960+
printf("cancel\n");
17921961
printf("disallowed_in_pipeline\n");
17931962
printf("multi_pipelines\n");
17941963
printf("nosync\n");
@@ -1890,7 +2059,9 @@ main(int argc, char **argv)
18902059
PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
18912060
}
18922061

1893-
if (strcmp(testname, "disallowed_in_pipeline") == 0)
2062+
if (strcmp(testname, "cancel") == 0)
2063+
test_cancel(conn);
2064+
else if (strcmp(testname, "disallowed_in_pipeline") == 0)
18942065
test_disallowed_in_pipeline(conn);
18952066
else if (strcmp(testname, "multi_pipelines") == 0)
18962067
test_multi_pipelines(conn);

0 commit comments

Comments
 (0)