23
23
24
24
/*
25
25
* conns[0] is the global setup, teardown, and watchdog connection. Additional
26
- * connections represent spec-defined sessions.
26
+ * connections represent spec-defined sessions. We also track the backend
27
+ * PID, in numeric and string formats, for each connection.
27
28
*/
28
29
static PGconn * * conns = NULL ;
29
- static const char * * backend_pids = NULL ;
30
+ static int * backend_pids = NULL ;
31
+ static const char * * backend_pid_strs = NULL ;
30
32
static int nconns = 0 ;
31
33
32
34
/* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
41
43
42
44
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
43
45
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
44
- static bool try_complete_step (Step * step , int flags );
46
+ static bool try_complete_step (TestSpec * testspec , Step * step , int flags );
45
47
46
48
static int step_qsort_cmp (const void * a , const void * b );
47
49
static int step_bsearch_cmp (const void * a , const void * b );
@@ -160,8 +162,10 @@ main(int argc, char **argv)
160
162
* extra for lock wait detection and global work.
161
163
*/
162
164
nconns = 1 + testspec -> nsessions ;
163
- conns = calloc (nconns , sizeof (PGconn * ));
164
- backend_pids = calloc (nconns , sizeof (* backend_pids ));
165
+ conns = (PGconn * * ) pg_malloc0 (nconns * sizeof (PGconn * ));
166
+ backend_pids = pg_malloc0 (nconns * sizeof (* backend_pids ));
167
+ backend_pid_strs = pg_malloc0 (nconns * sizeof (* backend_pid_strs ));
168
+
165
169
for (i = 0 ; i < nconns ; i ++ )
166
170
{
167
171
conns [i ] = PQconnectdb (conninfo );
@@ -187,26 +191,9 @@ main(int argc, char **argv)
187
191
blackholeNoticeProcessor ,
188
192
NULL );
189
193
190
- /* Get the backend pid for lock wait checking. */
191
- res = PQexec (conns [i ], "SELECT pg_backend_pid()" );
192
- if (PQresultStatus (res ) == PGRES_TUPLES_OK )
193
- {
194
- if (PQntuples (res ) == 1 && PQnfields (res ) == 1 )
195
- backend_pids [i ] = pg_strdup (PQgetvalue (res , 0 , 0 ));
196
- else
197
- {
198
- fprintf (stderr , "backend pid query returned %d rows and %d columns, expected 1 row and 1 column" ,
199
- PQntuples (res ), PQnfields (res ));
200
- exit_nicely ();
201
- }
202
- }
203
- else
204
- {
205
- fprintf (stderr , "backend pid query failed: %s" ,
206
- PQerrorMessage (conns [i ]));
207
- exit_nicely ();
208
- }
209
- PQclear (res );
194
+ /* Save each connection's backend PID for subsequent use. */
195
+ backend_pids [i ] = PQbackendPID (conns [i ]);
196
+ backend_pid_strs [i ] = psprintf ("%d" , backend_pids [i ]);
210
197
}
211
198
212
199
/* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
231
218
appendPQExpBufferStr (& wait_query ,
232
219
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{" );
233
220
/* The spec syntax requires at least one session; assume that here. */
234
- appendPQExpBufferStr (& wait_query , backend_pids [1 ]);
221
+ appendPQExpBufferStr (& wait_query , backend_pid_strs [1 ]);
235
222
for (i = 2 ; i < nconns ; i ++ )
236
- appendPQExpBuffer (& wait_query , ",%s" , backend_pids [i ]);
223
+ appendPQExpBuffer (& wait_query , ",%s" , backend_pid_strs [i ]);
237
224
appendPQExpBufferStr (& wait_query , "}')" );
238
225
239
226
res = PQprepare (conns [0 ], PREP_WAITING , wait_query .data , 0 , NULL );
@@ -552,7 +539,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
552
539
oldstep = waiting [w ];
553
540
554
541
/* Wait for previous step on this connection. */
555
- try_complete_step (oldstep , STEP_RETRY );
542
+ try_complete_step (testspec , oldstep , STEP_RETRY );
556
543
557
544
/* Remove that step from the waiting[] array. */
558
545
if (w + 1 < nwaiting )
@@ -574,7 +561,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
574
561
nerrorstep = 0 ;
575
562
while (w < nwaiting )
576
563
{
577
- if (try_complete_step (waiting [w ], STEP_NONBLOCK | STEP_RETRY ))
564
+ if (try_complete_step (testspec , waiting [w ],
565
+ STEP_NONBLOCK | STEP_RETRY ))
578
566
{
579
567
/* Still blocked on a lock, leave it alone. */
580
568
w ++ ;
@@ -603,14 +591,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
603
591
}
604
592
605
593
/* Try to complete this step without blocking. */
606
- mustwait = try_complete_step (step , STEP_NONBLOCK );
594
+ mustwait = try_complete_step (testspec , step , STEP_NONBLOCK );
607
595
608
596
/* Check for completion of any steps that were previously waiting. */
609
597
w = 0 ;
610
598
nerrorstep = 0 ;
611
599
while (w < nwaiting )
612
600
{
613
- if (try_complete_step (waiting [w ], STEP_NONBLOCK | STEP_RETRY ))
601
+ if (try_complete_step (testspec , waiting [w ],
602
+ STEP_NONBLOCK | STEP_RETRY ))
614
603
w ++ ;
615
604
else
616
605
{
@@ -633,7 +622,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
633
622
/* Wait for any remaining queries. */
634
623
for (w = 0 ; w < nwaiting ; ++ w )
635
624
{
636
- try_complete_step (waiting [w ], STEP_RETRY );
625
+ try_complete_step (testspec , waiting [w ], STEP_RETRY );
637
626
report_error_message (waiting [w ]);
638
627
}
639
628
@@ -696,7 +685,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
696
685
* a lock, returns true. Otherwise, returns false.
697
686
*/
698
687
static bool
699
- try_complete_step (Step * step , int flags )
688
+ try_complete_step (TestSpec * testspec , Step * step , int flags )
700
689
{
701
690
PGconn * conn = conns [1 + step -> session ];
702
691
fd_set read_set ;
@@ -705,6 +694,7 @@ try_complete_step(Step *step, int flags)
705
694
int sock = PQsocket (conn );
706
695
int ret ;
707
696
PGresult * res ;
697
+ PGnotify * notify ;
708
698
bool canceled = false;
709
699
710
700
if (sock < 0 )
@@ -741,7 +731,7 @@ try_complete_step(Step *step, int flags)
741
731
bool waiting ;
742
732
743
733
res = PQexecPrepared (conns [0 ], PREP_WAITING , 1 ,
744
- & backend_pids [step -> session + 1 ],
734
+ & backend_pid_strs [step -> session + 1 ],
745
735
NULL , NULL , 0 );
746
736
if (PQresultStatus (res ) != PGRES_TUPLES_OK ||
747
737
PQntuples (res ) != 1 )
@@ -883,6 +873,36 @@ try_complete_step(Step *step, int flags)
883
873
PQclear (res );
884
874
}
885
875
876
+ /* Report any available NOTIFY messages, too */
877
+ PQconsumeInput (conn );
878
+ while ((notify = PQnotifies (conn )) != NULL )
879
+ {
880
+ /* Try to identify which session it came from */
881
+ const char * sendername = NULL ;
882
+ char pidstring [32 ];
883
+ int i ;
884
+
885
+ for (i = 0 ; i < testspec -> nsessions ; i ++ )
886
+ {
887
+ if (notify -> be_pid == backend_pids [i + 1 ])
888
+ {
889
+ sendername = testspec -> sessions [i ]-> name ;
890
+ break ;
891
+ }
892
+ }
893
+ if (sendername == NULL )
894
+ {
895
+ /* Doesn't seem to be any test session, so show the hard way */
896
+ snprintf (pidstring , sizeof (pidstring ), "PID %d" , notify -> be_pid );
897
+ sendername = pidstring ;
898
+ }
899
+ printf ("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n" ,
900
+ testspec -> sessions [step -> session ]-> name ,
901
+ notify -> relname , notify -> extra , sendername );
902
+ PQfreemem (notify );
903
+ PQconsumeInput (conn );
904
+ }
905
+
886
906
return false;
887
907
}
888
908
0 commit comments