28
28
29
29
30
30
static void exit_nicely (PGconn * conn );
31
+ static bool process_result (PGconn * conn , PGresult * res , int results ,
32
+ int numsent );
31
33
32
34
const char * const progname = "libpq_pipeline" ;
33
35
@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
1307
1309
fprintf (stderr , "ok\n" );
1308
1310
}
1309
1311
1312
+ /*
1313
+ * In this test mode we send a stream of queries, with one in the middle
1314
+ * causing an error. Verify that we can still send some more after the
1315
+ * error and have libpq work properly.
1316
+ */
1317
+ static void
1318
+ test_uniqviol (PGconn * conn )
1319
+ {
1320
+ int sock = PQsocket (conn );
1321
+ PGresult * res ;
1322
+ Oid paramTypes [2 ] = {INT8OID , INT8OID };
1323
+ const char * paramValues [2 ];
1324
+ char paramValue0 [MAXINT8LEN ];
1325
+ char paramValue1 [MAXINT8LEN ];
1326
+ int ctr = 0 ;
1327
+ int numsent = 0 ;
1328
+ int results = 0 ;
1329
+ bool read_done = false;
1330
+ bool write_done = false;
1331
+ bool error_sent = false;
1332
+ bool got_error = false;
1333
+ int switched = 0 ;
1334
+ int socketful = 0 ;
1335
+ fd_set in_fds ;
1336
+ fd_set out_fds ;
1337
+
1338
+ fprintf (stderr , "uniqviol ..." );
1339
+
1340
+ PQsetnonblocking (conn , 1 );
1341
+
1342
+ paramValues [0 ] = paramValue0 ;
1343
+ paramValues [1 ] = paramValue1 ;
1344
+ sprintf (paramValue1 , "42" );
1345
+
1346
+ res = PQexec (conn , "drop table if exists ppln_uniqviol;"
1347
+ "create table ppln_uniqviol(id bigint primary key, idata bigint)" );
1348
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1349
+ pg_fatal ("failed to create table: %s" , PQerrorMessage (conn ));
1350
+
1351
+ res = PQexec (conn , "begin" );
1352
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1353
+ pg_fatal ("failed to begin transaction: %s" , PQerrorMessage (conn ));
1354
+
1355
+ res = PQprepare (conn , "insertion" ,
1356
+ "insert into ppln_uniqviol values ($1, $2) returning id" ,
1357
+ 2 , paramTypes );
1358
+ if (res == NULL || PQresultStatus (res ) != PGRES_COMMAND_OK )
1359
+ pg_fatal ("failed to prepare query: %s" , PQerrorMessage (conn ));
1360
+
1361
+ if (PQenterPipelineMode (conn ) != 1 )
1362
+ pg_fatal ("failed to enter pipeline mode" );
1363
+
1364
+ while (!read_done )
1365
+ {
1366
+ /*
1367
+ * Avoid deadlocks by reading everything the server has sent before
1368
+ * sending anything. (Special precaution is needed here to process
1369
+ * PQisBusy before testing the socket for read-readiness, because the
1370
+ * socket does not turn read-ready after "sending" queries in aborted
1371
+ * pipeline mode.)
1372
+ */
1373
+ while (PQisBusy (conn ) == 0 )
1374
+ {
1375
+ bool new_error ;
1376
+
1377
+ if (results >= numsent )
1378
+ {
1379
+ if (write_done )
1380
+ read_done = true;
1381
+ break ;
1382
+ }
1383
+
1384
+ res = PQgetResult (conn );
1385
+ new_error = process_result (conn , res , results , numsent );
1386
+ if (new_error && got_error )
1387
+ pg_fatal ("got two errors" );
1388
+ got_error |= new_error ;
1389
+ if (results ++ >= numsent - 1 )
1390
+ {
1391
+ if (write_done )
1392
+ read_done = true;
1393
+ break ;
1394
+ }
1395
+ }
1396
+
1397
+ if (read_done )
1398
+ break ;
1399
+
1400
+ FD_ZERO (& out_fds );
1401
+ FD_SET (sock , & out_fds );
1402
+
1403
+ FD_ZERO (& in_fds );
1404
+ FD_SET (sock , & in_fds );
1405
+
1406
+ if (select (sock + 1 , & in_fds , write_done ? NULL : & out_fds , NULL , NULL ) == -1 )
1407
+ {
1408
+ if (errno == EINTR )
1409
+ continue ;
1410
+ pg_fatal ("select() failed: %m" );
1411
+ }
1412
+
1413
+ if (FD_ISSET (sock , & in_fds ) && PQconsumeInput (conn ) == 0 )
1414
+ pg_fatal ("PQconsumeInput failed: %s" , PQerrorMessage (conn ));
1415
+
1416
+ /*
1417
+ * If the socket is writable and we haven't finished sending queries,
1418
+ * send some.
1419
+ */
1420
+ if (!write_done && FD_ISSET (sock , & out_fds ))
1421
+ {
1422
+ for (;;)
1423
+ {
1424
+ int flush ;
1425
+
1426
+ /*
1427
+ * provoke uniqueness violation exactly once after having
1428
+ * switched to read mode.
1429
+ */
1430
+ if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2 )
1431
+ {
1432
+ sprintf (paramValue0 , "%d" , numsent / 2 );
1433
+ fprintf (stderr , "E" );
1434
+ error_sent = true;
1435
+ }
1436
+ else
1437
+ {
1438
+ fprintf (stderr , "." );
1439
+ sprintf (paramValue0 , "%d" , ctr ++ );
1440
+ }
1441
+
1442
+ if (PQsendQueryPrepared (conn , "insertion" , 2 , paramValues , NULL , NULL , 0 ) != 1 )
1443
+ pg_fatal ("failed to execute prepared query: %s" , PQerrorMessage (conn ));
1444
+ numsent ++ ;
1445
+
1446
+ /* Are we done writing? */
1447
+ if (socketful != 0 && numsent % socketful == 42 && error_sent )
1448
+ {
1449
+ if (PQsendFlushRequest (conn ) != 1 )
1450
+ pg_fatal ("failed to send flush request" );
1451
+ write_done = true;
1452
+ fprintf (stderr , "\ndone writing\n" );
1453
+ PQflush (conn );
1454
+ break ;
1455
+ }
1456
+
1457
+ /* is the outgoing socket full? */
1458
+ flush = PQflush (conn );
1459
+ if (flush == -1 )
1460
+ pg_fatal ("failed to flush: %s" , PQerrorMessage (conn ));
1461
+ if (flush == 1 )
1462
+ {
1463
+ if (socketful == 0 )
1464
+ socketful = numsent ;
1465
+ fprintf (stderr , "\nswitch to reading\n" );
1466
+ switched ++ ;
1467
+ break ;
1468
+ }
1469
+ }
1470
+ }
1471
+ }
1472
+
1473
+ if (!got_error )
1474
+ pg_fatal ("did not get expected error" );
1475
+
1476
+ fprintf (stderr , "ok\n" );
1477
+ }
1478
+
1479
+ /*
1480
+ * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1481
+ * the expected NULL that should follow it.
1482
+ *
1483
+ * Returns true if we read a fatal error message, otherwise false.
1484
+ */
1485
+ static bool
1486
+ process_result (PGconn * conn , PGresult * res , int results , int numsent )
1487
+ {
1488
+ PGresult * res2 ;
1489
+ bool got_error = false;
1490
+
1491
+ if (res == NULL )
1492
+ pg_fatal ("got unexpected NULL" );
1493
+
1494
+ switch (PQresultStatus (res ))
1495
+ {
1496
+ case PGRES_FATAL_ERROR :
1497
+ got_error = true;
1498
+ fprintf (stderr , "result %d/%d (error): %s\n" , results , numsent , PQerrorMessage (conn ));
1499
+ PQclear (res );
1500
+
1501
+ res2 = PQgetResult (conn );
1502
+ if (res2 != NULL )
1503
+ pg_fatal ("expected NULL, got %s" ,
1504
+ PQresStatus (PQresultStatus (res2 )));
1505
+ break ;
1506
+
1507
+ case PGRES_TUPLES_OK :
1508
+ fprintf (stderr , "result %d/%d: %s\n" , results , numsent , PQgetvalue (res , 0 , 0 ));
1509
+ PQclear (res );
1510
+
1511
+ res2 = PQgetResult (conn );
1512
+ if (res2 != NULL )
1513
+ pg_fatal ("expected NULL, got %s" ,
1514
+ PQresStatus (PQresultStatus (res2 )));
1515
+ break ;
1516
+
1517
+ case PGRES_PIPELINE_ABORTED :
1518
+ fprintf (stderr , "result %d/%d: pipeline aborted\n" , results , numsent );
1519
+ res2 = PQgetResult (conn );
1520
+ if (res2 != NULL )
1521
+ pg_fatal ("expected NULL, got %s" ,
1522
+ PQresStatus (PQresultStatus (res2 )));
1523
+ break ;
1524
+
1525
+ default :
1526
+ pg_fatal ("got unexpected %s" , PQresStatus (PQresultStatus (res )));
1527
+ }
1528
+
1529
+ return got_error ;
1530
+ }
1531
+
1532
+
1310
1533
static void
1311
1534
usage (const char * progname )
1312
1535
{
@@ -1331,6 +1554,7 @@ print_test_list(void)
1331
1554
printf ("simple_pipeline\n" );
1332
1555
printf ("singlerow\n" );
1333
1556
printf ("transaction\n" );
1557
+ printf ("uniqviol\n" );
1334
1558
}
1335
1559
1336
1560
int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
1436
1660
test_singlerowmode (conn );
1437
1661
else if (strcmp (testname , "transaction" ) == 0 )
1438
1662
test_transaction (conn );
1663
+ else if (strcmp (testname , "uniqviol" ) == 0 )
1664
+ test_uniqviol (conn );
1439
1665
else
1440
1666
{
1441
1667
fprintf (stderr , "\"%s\" is not a recognized test name\n" , testname );
0 commit comments