23
23
24
24
/*
25
25
* conns[0] is the global setup, teardown, and watchdog connection. Additional
26
- * connections represent spec-defined sessions.
26
+ * connections represent spec-defined sessions. We also track the backend
27
+ * PID, in numeric and string formats, for each connection.
27
28
*/
28
29
static PGconn * * conns = NULL ;
29
- static const char * * backend_pids = NULL ;
30
+ static int * backend_pids = NULL ;
31
+ static const char * * backend_pid_strs = NULL ;
30
32
static int nconns = 0 ;
31
33
32
34
/* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
41
43
42
44
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
43
45
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
44
- static bool try_complete_step (Step * step , int flags );
46
+ static bool try_complete_step (TestSpec * testspec , Step * step , int flags );
45
47
46
48
static int step_qsort_cmp (const void * a , const void * b );
47
49
static int step_bsearch_cmp (const void * a , const void * b );
@@ -159,9 +161,11 @@ main(int argc, char **argv)
159
161
* extra for lock wait detection and global work.
160
162
*/
161
163
nconns = 1 + testspec -> nsessions ;
162
- conns = calloc (nconns , sizeof (PGconn * ));
164
+ conns = (PGconn * * ) pg_malloc0 (nconns * sizeof (PGconn * ));
165
+ backend_pids = pg_malloc0 (nconns * sizeof (* backend_pids ));
166
+ backend_pid_strs = pg_malloc0 (nconns * sizeof (* backend_pid_strs ));
163
167
atexit (disconnect_atexit );
164
- backend_pids = calloc ( nconns , sizeof ( * backend_pids ));
168
+
165
169
for (i = 0 ; i < nconns ; i ++ )
166
170
{
167
171
conns [i ] = PQconnectdb (conninfo );
@@ -187,26 +191,9 @@ main(int argc, char **argv)
187
191
blackholeNoticeProcessor ,
188
192
NULL );
189
193
190
- /* Get the backend pid for lock wait checking. */
191
- res = PQexec (conns [i ], "SELECT pg_catalog.pg_backend_pid()" );
192
- if (PQresultStatus (res ) == PGRES_TUPLES_OK )
193
- {
194
- if (PQntuples (res ) == 1 && PQnfields (res ) == 1 )
195
- backend_pids [i ] = pg_strdup (PQgetvalue (res , 0 , 0 ));
196
- else
197
- {
198
- fprintf (stderr , "backend pid query returned %d rows and %d columns, expected 1 row and 1 column" ,
199
- PQntuples (res ), PQnfields (res ));
200
- exit (1 );
201
- }
202
- }
203
- else
204
- {
205
- fprintf (stderr , "backend pid query failed: %s" ,
206
- PQerrorMessage (conns [i ]));
207
- exit (1 );
208
- }
209
- PQclear (res );
194
+ /* Save each connection's backend PID for subsequent use. */
195
+ backend_pids [i ] = PQbackendPID (conns [i ]);
196
+ backend_pid_strs [i ] = psprintf ("%d" , backend_pids [i ]);
210
197
}
211
198
212
199
/* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
231
218
appendPQExpBufferStr (& wait_query ,
232
219
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{" );
233
220
/* The spec syntax requires at least one session; assume that here. */
234
- appendPQExpBufferStr (& wait_query , backend_pids [1 ]);
221
+ appendPQExpBufferStr (& wait_query , backend_pid_strs [1 ]);
235
222
for (i = 2 ; i < nconns ; i ++ )
236
- appendPQExpBuffer (& wait_query , ",%s" , backend_pids [i ]);
223
+ appendPQExpBuffer (& wait_query , ",%s" , backend_pid_strs [i ]);
237
224
appendPQExpBufferStr (& wait_query , "}')" );
238
225
239
226
res = PQprepare (conns [0 ], PREP_WAITING , wait_query .data , 0 , NULL );
@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
549
536
oldstep = waiting [w ];
550
537
551
538
/* Wait for previous step on this connection. */
552
- try_complete_step (oldstep , STEP_RETRY );
539
+ try_complete_step (testspec , oldstep , STEP_RETRY );
553
540
554
541
/* Remove that step from the waiting[] array. */
555
542
if (w + 1 < nwaiting )
@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
571
558
nerrorstep = 0 ;
572
559
while (w < nwaiting )
573
560
{
574
- if (try_complete_step (waiting [w ], STEP_NONBLOCK | STEP_RETRY ))
561
+ if (try_complete_step (testspec , waiting [w ],
562
+ STEP_NONBLOCK | STEP_RETRY ))
575
563
{
576
564
/* Still blocked on a lock, leave it alone. */
577
565
w ++ ;
@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
600
588
}
601
589
602
590
/* Try to complete this step without blocking. */
603
- mustwait = try_complete_step (step , STEP_NONBLOCK );
591
+ mustwait = try_complete_step (testspec , step , STEP_NONBLOCK );
604
592
605
593
/* Check for completion of any steps that were previously waiting. */
606
594
w = 0 ;
607
595
nerrorstep = 0 ;
608
596
while (w < nwaiting )
609
597
{
610
- if (try_complete_step (waiting [w ], STEP_NONBLOCK | STEP_RETRY ))
598
+ if (try_complete_step (testspec , waiting [w ],
599
+ STEP_NONBLOCK | STEP_RETRY ))
611
600
w ++ ;
612
601
else
613
602
{
@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
630
619
/* Wait for any remaining queries. */
631
620
for (w = 0 ; w < nwaiting ; ++ w )
632
621
{
633
- try_complete_step (waiting [w ], STEP_RETRY );
622
+ try_complete_step (testspec , waiting [w ], STEP_RETRY );
634
623
report_error_message (waiting [w ]);
635
624
}
636
625
@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
693
682
* a lock, returns true. Otherwise, returns false.
694
683
*/
695
684
static bool
696
- try_complete_step (Step * step , int flags )
685
+ try_complete_step (TestSpec * testspec , Step * step , int flags )
697
686
{
698
687
PGconn * conn = conns [1 + step -> session ];
699
688
fd_set read_set ;
@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
702
691
int sock = PQsocket (conn );
703
692
int ret ;
704
693
PGresult * res ;
694
+ PGnotify * notify ;
705
695
bool canceled = false;
706
696
707
697
if (sock < 0 )
@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
738
728
bool waiting ;
739
729
740
730
res = PQexecPrepared (conns [0 ], PREP_WAITING , 1 ,
741
- & backend_pids [step -> session + 1 ],
731
+ & backend_pid_strs [step -> session + 1 ],
742
732
NULL , NULL , 0 );
743
733
if (PQresultStatus (res ) != PGRES_TUPLES_OK ||
744
734
PQntuples (res ) != 1 )
@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags)
880
870
PQclear (res );
881
871
}
882
872
873
+ /* Report any available NOTIFY messages, too */
874
+ PQconsumeInput (conn );
875
+ while ((notify = PQnotifies (conn )) != NULL )
876
+ {
877
+ /* Try to identify which session it came from */
878
+ const char * sendername = NULL ;
879
+ char pidstring [32 ];
880
+
881
+ for (int i = 0 ; i < testspec -> nsessions ; i ++ )
882
+ {
883
+ if (notify -> be_pid == backend_pids [i + 1 ])
884
+ {
885
+ sendername = testspec -> sessions [i ]-> name ;
886
+ break ;
887
+ }
888
+ }
889
+ if (sendername == NULL )
890
+ {
891
+ /* Doesn't seem to be any test session, so show the hard way */
892
+ snprintf (pidstring , sizeof (pidstring ), "PID %d" , notify -> be_pid );
893
+ sendername = pidstring ;
894
+ }
895
+ printf ("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n" ,
896
+ testspec -> sessions [step -> session ]-> name ,
897
+ notify -> relname , notify -> extra , sendername );
898
+ PQfreemem (notify );
899
+ PQconsumeInput (conn );
900
+ }
901
+
883
902
return false;
884
903
}
885
904
0 commit comments