Skip to content

Commit cf2f82a

Browse files
pg_upgrade: Parallelize incompatible polymorphics check.
This commit makes use of the new task framework in pg_upgrade to parallelize the check for usage of incompatible polymorphic functions, i.e., those with arguments of type anyarray/anyelement rather than the newer anycompatible variants. This step will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
1 parent c34eabf commit cf2f82a

File tree

1 file changed

+83
-76
lines changed

1 file changed

+83
-76
lines changed

src/bin/pg_upgrade/check.c

Lines changed: 83 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
14131413
check_ok();
14141414
}
14151415

1416+
/*
1417+
* Callback function for processing results of query for
1418+
* check_for_incompatible_polymorphics()'s UpgradeTask. If the query returned
1419+
* any rows (i.e., the check failed), write the details to the report file.
1420+
*/
1421+
static void
1422+
process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
1423+
{
1424+
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
1425+
bool db_used = false;
1426+
int ntups = PQntuples(res);
1427+
int i_objkind = PQfnumber(res, "objkind");
1428+
int i_objname = PQfnumber(res, "objname");
1429+
1430+
AssertVariableIsOfType(&process_incompat_polymorphics,
1431+
UpgradeTaskProcessCB);
1432+
1433+
for (int rowno = 0; rowno < ntups; rowno++)
1434+
{
1435+
if (report->file == NULL &&
1436+
(report->file = fopen_priv(report->path, "w")) == NULL)
1437+
pg_fatal("could not open file \"%s\": %m", report->path);
1438+
if (!db_used)
1439+
{
1440+
fprintf(report->file, "In database: %s\n", dbinfo->db_name);
1441+
db_used = true;
1442+
}
1443+
1444+
fprintf(report->file, " %s: %s\n",
1445+
PQgetvalue(res, rowno, i_objkind),
1446+
PQgetvalue(res, rowno, i_objname));
1447+
}
1448+
}
1449+
14161450
/*
14171451
* check_for_incompatible_polymorphics()
14181452
*
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
14221456
static void
14231457
check_for_incompatible_polymorphics(ClusterInfo *cluster)
14241458
{
1425-
PGresult *res;
1426-
FILE *script = NULL;
1427-
char output_path[MAXPGPATH];
14281459
PQExpBufferData old_polymorphics;
1460+
UpgradeTask *task = upgrade_task_create();
1461+
UpgradeTaskReport report;
1462+
char *query;
14291463

14301464
prep_status("Checking for incompatible polymorphic functions");
14311465

1432-
snprintf(output_path, sizeof(output_path), "%s/%s",
1466+
report.file = NULL;
1467+
snprintf(report.path, sizeof(report.path), "%s/%s",
14331468
log_opts.basedir,
14341469
"incompatible_polymorphics.txt");
14351470

@@ -1453,93 +1488,65 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
14531488
", 'array_positions(anyarray,anyelement)'"
14541489
", 'width_bucket(anyelement,anyarray)'");
14551490

1456-
for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
1457-
{
1458-
bool db_used = false;
1459-
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
1460-
PGconn *conn = connectToServer(cluster, active_db->db_name);
1461-
int ntups;
1462-
int i_objkind,
1463-
i_objname;
1464-
1465-
/*
1466-
* The query below hardcodes FirstNormalObjectId as 16384 rather than
1467-
* interpolating that C #define into the query because, if that
1468-
* #define is ever changed, the cutoff we want to use is the value
1469-
* used by pre-version 14 servers, not that of some future version.
1470-
*/
1471-
res = executeQueryOrDie(conn,
1472-
/* Aggregate transition functions */
1473-
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
1474-
"FROM pg_proc AS p "
1475-
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
1476-
"JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
1477-
"WHERE p.oid >= 16384 "
1478-
"AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
1479-
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
1480-
1481-
/* Aggregate final functions */
1482-
"UNION ALL "
1483-
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
1484-
"FROM pg_proc AS p "
1485-
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
1486-
"JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
1487-
"WHERE p.oid >= 16384 "
1488-
"AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
1489-
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
1490-
1491-
/* Operators */
1492-
"UNION ALL "
1493-
"SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
1494-
"FROM pg_operator AS op "
1495-
"WHERE op.oid >= 16384 "
1496-
"AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
1497-
"AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
1498-
old_polymorphics.data,
1499-
old_polymorphics.data,
1500-
old_polymorphics.data);
1501-
1502-
ntups = PQntuples(res);
1503-
1504-
i_objkind = PQfnumber(res, "objkind");
1505-
i_objname = PQfnumber(res, "objname");
1506-
1507-
for (int rowno = 0; rowno < ntups; rowno++)
1508-
{
1509-
if (script == NULL &&
1510-
(script = fopen_priv(output_path, "w")) == NULL)
1511-
pg_fatal("could not open file \"%s\": %m", output_path);
1512-
if (!db_used)
1513-
{
1514-
fprintf(script, "In database: %s\n", active_db->db_name);
1515-
db_used = true;
1516-
}
1517-
1518-
fprintf(script, " %s: %s\n",
1519-
PQgetvalue(res, rowno, i_objkind),
1520-
PQgetvalue(res, rowno, i_objname));
1521-
}
1491+
/*
1492+
* The query below hardcodes FirstNormalObjectId as 16384 rather than
1493+
* interpolating that C #define into the query because, if that #define is
1494+
* ever changed, the cutoff we want to use is the value used by
1495+
* pre-version 14 servers, not that of some future version.
1496+
*/
15221497

1523-
PQclear(res);
1524-
PQfinish(conn);
1525-
}
1498+
/* Aggregate transition functions */
1499+
query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
1500+
"FROM pg_proc AS p "
1501+
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
1502+
"JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
1503+
"WHERE p.oid >= 16384 "
1504+
"AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
1505+
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
1506+
1507+
/* Aggregate final functions */
1508+
"UNION ALL "
1509+
"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
1510+
"FROM pg_proc AS p "
1511+
"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
1512+
"JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
1513+
"WHERE p.oid >= 16384 "
1514+
"AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
1515+
"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
1516+
1517+
/* Operators */
1518+
"UNION ALL "
1519+
"SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
1520+
"FROM pg_operator AS op "
1521+
"WHERE op.oid >= 16384 "
1522+
"AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
1523+
"AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[])",
1524+
old_polymorphics.data,
1525+
old_polymorphics.data,
1526+
old_polymorphics.data);
1527+
1528+
upgrade_task_add_step(task, query, process_incompat_polymorphics,
1529+
true, &report);
1530+
upgrade_task_run(task, cluster);
1531+
upgrade_task_free(task);
15261532

1527-
if (script)
1533+
if (report.file)
15281534
{
1529-
fclose(script);
1535+
fclose(report.file);
15301536
pg_log(PG_REPORT, "fatal");
15311537
pg_fatal("Your installation contains user-defined objects that refer to internal\n"
15321538
"polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n"
15331539
"These user-defined objects must be dropped before upgrading and restored\n"
15341540
"afterwards, changing them to refer to the new corresponding functions with\n"
15351541
"arguments of type \"anycompatiblearray\" and \"anycompatible\".\n"
15361542
"A list of the problematic objects is in the file:\n"
1537-
" %s", output_path);
1543+
" %s", report.path);
15381544
}
15391545
else
15401546
check_ok();
15411547

15421548
termPQExpBuffer(&old_polymorphics);
1549+
pg_free(query);
15431550
}
15441551

15451552
/*

0 commit comments

Comments
 (0)