Skip to content

Commit 1ee910c

Browse files
committed
Hopefully make libpq_pipeline's new cancel test more reliable
The newly introduced cancel test in libpq_pipeline was flaky. It's not completely clear why, but one option is that the check for "active" was actually seeing the active state for the previous query. This change should address any such race condition by first waiting until the connection is reported as idle. Author: Jelte Fennema-Nio <me@jeltef.nl> Discussion: https://postgr.es/m/CAGECzQRvmUK5-d68A+cm+fgmfht9Dv2uZ28-qq3QiaF6EAZqPQ@mail.gmail.com
1 parent dbfc447 commit 1ee910c

File tree

1 file changed

+49
-29
lines changed

1 file changed

+49
-29
lines changed

src/test/modules/libpq_pipeline/libpq_pipeline.c

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -114,50 +114,38 @@ confirm_query_canceled_impl(int line, PGconn *conn)
114114
PQconsumeInput(conn);
115115
}
116116

117-
#define send_cancellable_query(conn, monitorConn) \
118-
send_cancellable_query_impl(__LINE__, conn, monitorConn)
117+
/*
118+
* Using monitorConn, query pg_stat_activity to see that the connection with
119+
* the given PID is in the given state. We never stop until it does.
120+
*/
119121
static void
120-
send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
122+
wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state)
121123
{
122-
const char *env_wait;
123-
const Oid paramTypes[1] = {INT4OID};
124-
int procpid = PQbackendPID(conn);
125-
126-
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
127-
if (env_wait == NULL)
128-
env_wait = "180";
124+
const Oid paramTypes[] = {INT4OID, TEXTOID};
125+
const char *paramValues[2];
126+
char *pidstr = psprintf("%d", procpid);
129127

130-
if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
131-
&env_wait, NULL, NULL, 0) != 1)
132-
pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
128+
paramValues[0] = pidstr;
129+
paramValues[1] = state;
133130

134-
/*
135-
* Wait until the query is actually running. Otherwise sending a
136-
* cancellation request might not cancel the query due to race conditions.
137-
*/
138131
while (true)
139132
{
140-
char *value;
141133
PGresult *res;
142-
const char *paramValues[1];
143-
char pidval[16];
144-
145-
snprintf(pidval, 16, "%d", procpid);
146-
paramValues[0] = pidval;
134+
char *value;
147135

148136
res = PQexecParams(monitorConn,
149137
"SELECT count(*) FROM pg_stat_activity WHERE "
150-
"pid = $1 AND state = 'active'",
151-
1, NULL, paramValues, NULL, NULL, 1);
138+
"pid = $1 AND state = $2",
139+
2, paramTypes, paramValues, NULL, NULL, 1);
152140

153141
if (PQresultStatus(res) != PGRES_TUPLES_OK)
154-
pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
142+
pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
155143
if (PQntuples(res) != 1)
156-
pg_fatal("unexpected number of rows received: %d", PQntuples(res));
144+
pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
157145
if (PQnfields(res) != 1)
158-
pg_fatal("unexpected number of columns received: %d", PQnfields(res));
146+
pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
159147
value = PQgetvalue(res, 0, 0);
160-
if (*value != '0')
148+
if (value[0] != '0')
161149
{
162150
PQclear(res);
163151
break;
@@ -167,6 +155,38 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
167155
/* wait 10ms before polling again */
168156
pg_usleep(10000);
169157
}
158+
159+
pfree(pidstr);
160+
}
161+
162+
#define send_cancellable_query(conn, monitorConn) \
163+
send_cancellable_query_impl(__LINE__, conn, monitorConn)
164+
static void
165+
send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
166+
{
167+
const char *env_wait;
168+
const Oid paramTypes[1] = {INT4OID};
169+
170+
/*
171+
* Wait for the connection to be idle, so that our check for an active
172+
* connection below is reliable, instead of possibly seeing an outdated
173+
* state.
174+
*/
175+
wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle");
176+
177+
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
178+
if (env_wait == NULL)
179+
env_wait = "180";
180+
181+
if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
182+
&env_wait, NULL, NULL, 0) != 1)
183+
pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
184+
185+
/*
186+
* Wait for the query to start, because if the query is not running yet
187+
* the cancel request that we send won't have any effect.
188+
*/
189+
wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "active");
170190
}
171191

172192
/*

0 commit comments

Comments
 (0)