Skip to content

Commit bbf83ca

Browse files
pg_upgrade: Parallelize data type checks.
This commit makes use of the new task framework in pg_upgrade to parallelize the checks for incompatible data types, i.e., data types whose on-disk format has changed, data types that have been removed, etc. This step will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
1 parent 6ab8f27 commit bbf83ca

File tree

1 file changed

+191
-160
lines changed

1 file changed

+191
-160
lines changed

src/bin/pg_upgrade/check.c

Lines changed: 191 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
314314
}
315315
};
316316

317+
/*
318+
* Private state for check_for_data_types_usage()'s UpgradeTask.
319+
*/
320+
struct data_type_check_state
321+
{
322+
DataTypesUsageChecks *check; /* the check for this step */
323+
bool *result; /* true if check failed for any database */
324+
PQExpBuffer *report; /* buffer for report on failed checks */
325+
};
326+
327+
/*
328+
* Returns a palloc'd query string for the data type check, for use by
329+
* check_for_data_types_usage()'s UpgradeTask.
330+
*/
331+
static char *
332+
data_type_check_query(int checknum)
333+
{
334+
DataTypesUsageChecks *check = &data_types_usage_checks[checknum];
335+
336+
return psprintf("WITH RECURSIVE oids AS ( "
337+
/* start with the type(s) returned by base_query */
338+
" %s "
339+
" UNION ALL "
340+
" SELECT * FROM ( "
341+
/* inner WITH because we can only reference the CTE once */
342+
" WITH x AS (SELECT oid FROM oids) "
343+
/* domains on any type selected so far */
344+
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
345+
" UNION ALL "
346+
/* arrays over any type selected so far */
347+
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
348+
" UNION ALL "
349+
/* composite types containing any type selected so far */
350+
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
351+
" WHERE t.typtype = 'c' AND "
352+
" t.oid = c.reltype AND "
353+
" c.oid = a.attrelid AND "
354+
" NOT a.attisdropped AND "
355+
" a.atttypid = x.oid "
356+
" UNION ALL "
357+
/* ranges containing any type selected so far */
358+
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
359+
" WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
360+
" ) foo "
361+
") "
362+
/* now look for stored columns of any such type */
363+
"SELECT n.nspname, c.relname, a.attname "
364+
"FROM pg_catalog.pg_class c, "
365+
" pg_catalog.pg_namespace n, "
366+
" pg_catalog.pg_attribute a "
367+
"WHERE c.oid = a.attrelid AND "
368+
" NOT a.attisdropped AND "
369+
" a.atttypid IN (SELECT oid FROM oids) AND "
370+
" c.relkind IN ("
371+
CppAsString2(RELKIND_RELATION) ", "
372+
CppAsString2(RELKIND_MATVIEW) ", "
373+
CppAsString2(RELKIND_INDEX) ") AND "
374+
" c.relnamespace = n.oid AND "
375+
/* exclude possible orphaned temp tables */
376+
" n.nspname !~ '^pg_temp_' AND "
377+
" n.nspname !~ '^pg_toast_temp_' AND "
378+
/* exclude system catalogs, too */
379+
" n.nspname NOT IN ('pg_catalog', 'information_schema')",
380+
check->base_query);
381+
}
382+
383+
/*
384+
* Callback function for processing results of queries for
385+
* check_for_data_types_usage()'s UpgradeTask. If the query returned any rows
386+
* (i.e., the check failed), write the details to the report file.
387+
*/
388+
static void
389+
process_data_type_check(DbInfo *dbinfo, PGresult *res, void *arg)
390+
{
391+
struct data_type_check_state *state = (struct data_type_check_state *) arg;
392+
int ntups = PQntuples(res);
393+
394+
AssertVariableIsOfType(&process_data_type_check, UpgradeTaskProcessCB);
395+
396+
if (ntups)
397+
{
398+
char output_path[MAXPGPATH];
399+
int i_nspname;
400+
int i_relname;
401+
int i_attname;
402+
FILE *script = NULL;
403+
bool db_used = false;
404+
405+
snprintf(output_path, sizeof(output_path), "%s/%s",
406+
log_opts.basedir,
407+
state->check->report_filename);
408+
409+
/*
410+
* Make sure we have a buffer to save reports to now that we found a
411+
* first failing check.
412+
*/
413+
if (*state->report == NULL)
414+
*state->report = createPQExpBuffer();
415+
416+
/*
417+
* If this is the first time we see an error for the check in question
418+
* then print a status message of the failure.
419+
*/
420+
if (!(*state->result))
421+
{
422+
pg_log(PG_REPORT, " failed check: %s", _(state->check->status));
423+
appendPQExpBuffer(*state->report, "\n%s\n%s %s\n",
424+
_(state->check->report_text),
425+
_("A list of the problem columns is in the file:"),
426+
output_path);
427+
}
428+
*state->result = true;
429+
430+
i_nspname = PQfnumber(res, "nspname");
431+
i_relname = PQfnumber(res, "relname");
432+
i_attname = PQfnumber(res, "attname");
433+
434+
for (int rowno = 0; rowno < ntups; rowno++)
435+
{
436+
if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL)
437+
pg_fatal("could not open file \"%s\": %m", output_path);
438+
439+
if (!db_used)
440+
{
441+
fprintf(script, "In database: %s\n", dbinfo->db_name);
442+
db_used = true;
443+
}
444+
fprintf(script, " %s.%s.%s\n",
445+
PQgetvalue(res, rowno, i_nspname),
446+
PQgetvalue(res, rowno, i_relname),
447+
PQgetvalue(res, rowno, i_attname));
448+
}
449+
450+
if (script)
451+
{
452+
fclose(script);
453+
script = NULL;
454+
}
455+
}
456+
}
457+
317458
/*
318459
* check_for_data_types_usage()
319460
* Detect whether there are any stored columns depending on given type(s)
@@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
334475
* there's no storage involved in a view.
335476
*/
336477
static void
337-
check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
478+
check_for_data_types_usage(ClusterInfo *cluster)
338479
{
339-
bool found = false;
340480
bool *results;
341-
PQExpBufferData report;
342-
DataTypesUsageChecks *tmp = checks;
481+
PQExpBuffer report = NULL;
482+
DataTypesUsageChecks *tmp = data_types_usage_checks;
343483
int n_data_types_usage_checks = 0;
484+
UpgradeTask *task = upgrade_task_create();
485+
char **queries = NULL;
486+
struct data_type_check_state *states;
344487

345488
prep_status("Checking data type usage");
346489

@@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
353496

354497
/* Prepare an array to store the results of checks in */
355498
results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
499+
queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks);
500+
states = pg_malloc0(sizeof(struct data_type_check_state) * n_data_types_usage_checks);
356501

357-
/*
358-
* Connect to each database in the cluster and run all defined checks
359-
* against that database before trying the next one.
360-
*/
361-
for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
502+
for (int i = 0; i < n_data_types_usage_checks; i++)
362503
{
363-
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
364-
PGconn *conn = connectToServer(cluster, active_db->db_name);
504+
DataTypesUsageChecks *check = &data_types_usage_checks[i];
365505

366-
for (int checknum = 0; checknum < n_data_types_usage_checks; checknum++)
506+
if (check->threshold_version == MANUAL_CHECK)
367507
{
368-
PGresult *res;
369-
int ntups;
370-
int i_nspname;
371-
int i_relname;
372-
int i_attname;
373-
FILE *script = NULL;
374-
bool db_used = false;
375-
char output_path[MAXPGPATH];
376-
DataTypesUsageChecks *cur_check = &checks[checknum];
377-
378-
if (cur_check->threshold_version == MANUAL_CHECK)
379-
{
380-
Assert(cur_check->version_hook);
381-
382-
/*
383-
* Make sure that the check applies to the current cluster
384-
* version and skip if not. If no check hook has been defined
385-
* we run the check for all versions.
386-
*/
387-
if (!cur_check->version_hook(cluster))
388-
continue;
389-
}
390-
else if (cur_check->threshold_version != ALL_VERSIONS)
391-
{
392-
if (GET_MAJOR_VERSION(cluster->major_version) > cur_check->threshold_version)
393-
continue;
394-
}
395-
else
396-
Assert(cur_check->threshold_version == ALL_VERSIONS);
397-
398-
snprintf(output_path, sizeof(output_path), "%s/%s",
399-
log_opts.basedir,
400-
cur_check->report_filename);
508+
Assert(check->version_hook);
401509

402510
/*
403-
* The type(s) of interest might be wrapped in a domain, array,
404-
* composite, or range, and these container types can be nested
405-
* (to varying extents depending on server version, but that's not
406-
* of concern here). To handle all these cases we need a
407-
* recursive CTE.
511+
* Make sure that the check applies to the current cluster version
512+
* and skip it if not.
408513
*/
409-
res = executeQueryOrDie(conn,
410-
"WITH RECURSIVE oids AS ( "
411-
/* start with the type(s) returned by base_query */
412-
" %s "
413-
" UNION ALL "
414-
" SELECT * FROM ( "
415-
/* inner WITH because we can only reference the CTE once */
416-
" WITH x AS (SELECT oid FROM oids) "
417-
/* domains on any type selected so far */
418-
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
419-
" UNION ALL "
420-
/* arrays over any type selected so far */
421-
" SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
422-
" UNION ALL "
423-
/* composite types containing any type selected so far */
424-
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
425-
" WHERE t.typtype = 'c' AND "
426-
" t.oid = c.reltype AND "
427-
" c.oid = a.attrelid AND "
428-
" NOT a.attisdropped AND "
429-
" a.atttypid = x.oid "
430-
" UNION ALL "
431-
/* ranges containing any type selected so far */
432-
" SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
433-
" WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
434-
" ) foo "
435-
") "
436-
/* now look for stored columns of any such type */
437-
"SELECT n.nspname, c.relname, a.attname "
438-
"FROM pg_catalog.pg_class c, "
439-
" pg_catalog.pg_namespace n, "
440-
" pg_catalog.pg_attribute a "
441-
"WHERE c.oid = a.attrelid AND "
442-
" NOT a.attisdropped AND "
443-
" a.atttypid IN (SELECT oid FROM oids) AND "
444-
" c.relkind IN ("
445-
CppAsString2(RELKIND_RELATION) ", "
446-
CppAsString2(RELKIND_MATVIEW) ", "
447-
CppAsString2(RELKIND_INDEX) ") AND "
448-
" c.relnamespace = n.oid AND "
449-
/* exclude possible orphaned temp tables */
450-
" n.nspname !~ '^pg_temp_' AND "
451-
" n.nspname !~ '^pg_toast_temp_' AND "
452-
/* exclude system catalogs, too */
453-
" n.nspname NOT IN ('pg_catalog', 'information_schema')",
454-
cur_check->base_query);
455-
456-
ntups = PQntuples(res);
514+
if (!check->version_hook(cluster))
515+
continue;
516+
}
517+
else if (check->threshold_version != ALL_VERSIONS)
518+
{
519+
if (GET_MAJOR_VERSION(cluster->major_version) > check->threshold_version)
520+
continue;
521+
}
522+
else
523+
Assert(check->threshold_version == ALL_VERSIONS);
457524

458-
/*
459-
* The datatype was found, so extract the data and log to the
460-
* requested filename. We need to open the file for appending
461-
* since the check might have already found the type in another
462-
* database earlier in the loop.
463-
*/
464-
if (ntups)
465-
{
466-
/*
467-
* Make sure we have a buffer to save reports to now that we
468-
* found a first failing check.
469-
*/
470-
if (!found)
471-
initPQExpBuffer(&report);
472-
found = true;
473-
474-
/*
475-
* If this is the first time we see an error for the check in
476-
* question then print a status message of the failure.
477-
*/
478-
if (!results[checknum])
479-
{
480-
pg_log(PG_REPORT, " failed check: %s", _(cur_check->status));
481-
appendPQExpBuffer(&report, "\n%s\n%s %s\n",
482-
_(cur_check->report_text),
483-
_("A list of the problem columns is in the file:"),
484-
output_path);
485-
}
486-
results[checknum] = true;
487-
488-
i_nspname = PQfnumber(res, "nspname");
489-
i_relname = PQfnumber(res, "relname");
490-
i_attname = PQfnumber(res, "attname");
491-
492-
for (int rowno = 0; rowno < ntups; rowno++)
493-
{
494-
if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL)
495-
pg_fatal("could not open file \"%s\": %m", output_path);
496-
497-
if (!db_used)
498-
{
499-
fprintf(script, "In database: %s\n", active_db->db_name);
500-
db_used = true;
501-
}
502-
fprintf(script, " %s.%s.%s\n",
503-
PQgetvalue(res, rowno, i_nspname),
504-
PQgetvalue(res, rowno, i_relname),
505-
PQgetvalue(res, rowno, i_attname));
506-
}
507-
508-
if (script)
509-
{
510-
fclose(script);
511-
script = NULL;
512-
}
513-
}
525+
queries[i] = data_type_check_query(i);
514526

515-
PQclear(res);
516-
}
527+
states[i].check = check;
528+
states[i].result = &results[i];
529+
states[i].report = &report;
517530

518-
PQfinish(conn);
531+
upgrade_task_add_step(task, queries[i], process_data_type_check,
532+
true, &states[i]);
519533
}
520534

521-
if (found)
522-
pg_fatal("Data type checks failed: %s", report.data);
535+
/*
536+
* Connect to each database in the cluster and run all defined checks
537+
* against that database before trying the next one.
538+
*/
539+
upgrade_task_run(task, cluster);
540+
upgrade_task_free(task);
541+
542+
if (report)
543+
{
544+
pg_fatal("Data type checks failed: %s", report->data);
545+
destroyPQExpBuffer(report);
546+
}
523547

524548
pg_free(results);
549+
for (int i = 0; i < n_data_types_usage_checks; i++)
550+
{
551+
if (queries[i])
552+
pg_free(queries[i]);
553+
}
554+
pg_free(queries);
555+
pg_free(states);
525556

526557
check_ok();
527558
}
@@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
616647
check_old_cluster_subscription_state();
617648
}
618649

619-
check_for_data_types_usage(&old_cluster, data_types_usage_checks);
650+
check_for_data_types_usage(&old_cluster);
620651

621652
/*
622653
* PG 14 changed the function signature of encoding conversion functions.

0 commit comments

Comments
 (0)