@@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
314
314
}
315
315
};
316
316
317
+ /*
318
+ * Private state for check_for_data_types_usage()'s UpgradeTask.
319
+ */
320
+ struct data_type_check_state
321
+ {
322
+ DataTypesUsageChecks * check ; /* the check for this step */
323
+ bool * result ; /* true if check failed for any database */
324
+ PQExpBuffer * report ; /* buffer for report on failed checks */
325
+ };
326
+
327
+ /*
328
+ * Returns a palloc'd query string for the data type check, for use by
329
+ * check_for_data_types_usage()'s UpgradeTask.
330
+ */
331
+ static char *
332
+ data_type_check_query (int checknum )
333
+ {
334
+ DataTypesUsageChecks * check = & data_types_usage_checks [checknum ];
335
+
336
+ return psprintf ("WITH RECURSIVE oids AS ( "
337
+ /* start with the type(s) returned by base_query */
338
+ " %s "
339
+ " UNION ALL "
340
+ " SELECT * FROM ( "
341
+ /* inner WITH because we can only reference the CTE once */
342
+ " WITH x AS (SELECT oid FROM oids) "
343
+ /* domains on any type selected so far */
344
+ " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
345
+ " UNION ALL "
346
+ /* arrays over any type selected so far */
347
+ " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
348
+ " UNION ALL "
349
+ /* composite types containing any type selected so far */
350
+ " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
351
+ " WHERE t.typtype = 'c' AND "
352
+ " t.oid = c.reltype AND "
353
+ " c.oid = a.attrelid AND "
354
+ " NOT a.attisdropped AND "
355
+ " a.atttypid = x.oid "
356
+ " UNION ALL "
357
+ /* ranges containing any type selected so far */
358
+ " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
359
+ " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
360
+ " ) foo "
361
+ ") "
362
+ /* now look for stored columns of any such type */
363
+ "SELECT n.nspname, c.relname, a.attname "
364
+ "FROM pg_catalog.pg_class c, "
365
+ " pg_catalog.pg_namespace n, "
366
+ " pg_catalog.pg_attribute a "
367
+ "WHERE c.oid = a.attrelid AND "
368
+ " NOT a.attisdropped AND "
369
+ " a.atttypid IN (SELECT oid FROM oids) AND "
370
+ " c.relkind IN ("
371
+ CppAsString2 (RELKIND_RELATION ) ", "
372
+ CppAsString2 (RELKIND_MATVIEW ) ", "
373
+ CppAsString2 (RELKIND_INDEX ) ") AND "
374
+ " c.relnamespace = n.oid AND "
375
+ /* exclude possible orphaned temp tables */
376
+ " n.nspname !~ '^pg_temp_' AND "
377
+ " n.nspname !~ '^pg_toast_temp_' AND "
378
+ /* exclude system catalogs, too */
379
+ " n.nspname NOT IN ('pg_catalog', 'information_schema')" ,
380
+ check -> base_query );
381
+ }
382
+
383
+ /*
384
+ * Callback function for processing results of queries for
385
+ * check_for_data_types_usage()'s UpgradeTask. If the query returned any rows
386
+ * (i.e., the check failed), write the details to the report file.
387
+ */
388
+ static void
389
+ process_data_type_check (DbInfo * dbinfo , PGresult * res , void * arg )
390
+ {
391
+ struct data_type_check_state * state = (struct data_type_check_state * ) arg ;
392
+ int ntups = PQntuples (res );
393
+
394
+ AssertVariableIsOfType (& process_data_type_check , UpgradeTaskProcessCB );
395
+
396
+ if (ntups )
397
+ {
398
+ char output_path [MAXPGPATH ];
399
+ int i_nspname ;
400
+ int i_relname ;
401
+ int i_attname ;
402
+ FILE * script = NULL ;
403
+ bool db_used = false;
404
+
405
+ snprintf (output_path , sizeof (output_path ), "%s/%s" ,
406
+ log_opts .basedir ,
407
+ state -> check -> report_filename );
408
+
409
+ /*
410
+ * Make sure we have a buffer to save reports to now that we found a
411
+ * first failing check.
412
+ */
413
+ if (* state -> report == NULL )
414
+ * state -> report = createPQExpBuffer ();
415
+
416
+ /*
417
+ * If this is the first time we see an error for the check in question
418
+ * then print a status message of the failure.
419
+ */
420
+ if (!(* state -> result ))
421
+ {
422
+ pg_log (PG_REPORT , " failed check: %s" , _ (state -> check -> status ));
423
+ appendPQExpBuffer (* state -> report , "\n%s\n%s %s\n" ,
424
+ _ (state -> check -> report_text ),
425
+ _ ("A list of the problem columns is in the file:" ),
426
+ output_path );
427
+ }
428
+ * state -> result = true;
429
+
430
+ i_nspname = PQfnumber (res , "nspname" );
431
+ i_relname = PQfnumber (res , "relname" );
432
+ i_attname = PQfnumber (res , "attname" );
433
+
434
+ for (int rowno = 0 ; rowno < ntups ; rowno ++ )
435
+ {
436
+ if (script == NULL && (script = fopen_priv (output_path , "a" )) == NULL )
437
+ pg_fatal ("could not open file \"%s\": %m" , output_path );
438
+
439
+ if (!db_used )
440
+ {
441
+ fprintf (script , "In database: %s\n" , dbinfo -> db_name );
442
+ db_used = true;
443
+ }
444
+ fprintf (script , " %s.%s.%s\n" ,
445
+ PQgetvalue (res , rowno , i_nspname ),
446
+ PQgetvalue (res , rowno , i_relname ),
447
+ PQgetvalue (res , rowno , i_attname ));
448
+ }
449
+
450
+ if (script )
451
+ {
452
+ fclose (script );
453
+ script = NULL ;
454
+ }
455
+ }
456
+ }
457
+
317
458
/*
318
459
* check_for_data_types_usage()
319
460
* Detect whether there are any stored columns depending on given type(s)
@@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
334
475
* there's no storage involved in a view.
335
476
*/
336
477
static void
337
- check_for_data_types_usage (ClusterInfo * cluster , DataTypesUsageChecks * checks )
478
+ check_for_data_types_usage (ClusterInfo * cluster )
338
479
{
339
- bool found = false;
340
480
bool * results ;
341
- PQExpBufferData report ;
342
- DataTypesUsageChecks * tmp = checks ;
481
+ PQExpBuffer report = NULL ;
482
+ DataTypesUsageChecks * tmp = data_types_usage_checks ;
343
483
int n_data_types_usage_checks = 0 ;
484
+ UpgradeTask * task = upgrade_task_create ();
485
+ char * * queries = NULL ;
486
+ struct data_type_check_state * states ;
344
487
345
488
prep_status ("Checking data type usage" );
346
489
@@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
353
496
354
497
/* Prepare an array to store the results of checks in */
355
498
results = pg_malloc0 (sizeof (bool ) * n_data_types_usage_checks );
499
+ queries = pg_malloc0 (sizeof (char * ) * n_data_types_usage_checks );
500
+ states = pg_malloc0 (sizeof (struct data_type_check_state ) * n_data_types_usage_checks );
356
501
357
- /*
358
- * Connect to each database in the cluster and run all defined checks
359
- * against that database before trying the next one.
360
- */
361
- for (int dbnum = 0 ; dbnum < cluster -> dbarr .ndbs ; dbnum ++ )
502
+ for (int i = 0 ; i < n_data_types_usage_checks ; i ++ )
362
503
{
363
- DbInfo * active_db = & cluster -> dbarr .dbs [dbnum ];
364
- PGconn * conn = connectToServer (cluster , active_db -> db_name );
504
+ DataTypesUsageChecks * check = & data_types_usage_checks [i ];
365
505
366
- for ( int checknum = 0 ; checknum < n_data_types_usage_checks ; checknum ++ )
506
+ if ( check -> threshold_version == MANUAL_CHECK )
367
507
{
368
- PGresult * res ;
369
- int ntups ;
370
- int i_nspname ;
371
- int i_relname ;
372
- int i_attname ;
373
- FILE * script = NULL ;
374
- bool db_used = false;
375
- char output_path [MAXPGPATH ];
376
- DataTypesUsageChecks * cur_check = & checks [checknum ];
377
-
378
- if (cur_check -> threshold_version == MANUAL_CHECK )
379
- {
380
- Assert (cur_check -> version_hook );
381
-
382
- /*
383
- * Make sure that the check applies to the current cluster
384
- * version and skip if not. If no check hook has been defined
385
- * we run the check for all versions.
386
- */
387
- if (!cur_check -> version_hook (cluster ))
388
- continue ;
389
- }
390
- else if (cur_check -> threshold_version != ALL_VERSIONS )
391
- {
392
- if (GET_MAJOR_VERSION (cluster -> major_version ) > cur_check -> threshold_version )
393
- continue ;
394
- }
395
- else
396
- Assert (cur_check -> threshold_version == ALL_VERSIONS );
397
-
398
- snprintf (output_path , sizeof (output_path ), "%s/%s" ,
399
- log_opts .basedir ,
400
- cur_check -> report_filename );
508
+ Assert (check -> version_hook );
401
509
402
510
/*
403
- * The type(s) of interest might be wrapped in a domain, array,
404
- * composite, or range, and these container types can be nested
405
- * (to varying extents depending on server version, but that's not
406
- * of concern here). To handle all these cases we need a
407
- * recursive CTE.
511
+ * Make sure that the check applies to the current cluster version
512
+ * and skip it if not.
408
513
*/
409
- res = executeQueryOrDie (conn ,
410
- "WITH RECURSIVE oids AS ( "
411
- /* start with the type(s) returned by base_query */
412
- " %s "
413
- " UNION ALL "
414
- " SELECT * FROM ( "
415
- /* inner WITH because we can only reference the CTE once */
416
- " WITH x AS (SELECT oid FROM oids) "
417
- /* domains on any type selected so far */
418
- " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
419
- " UNION ALL "
420
- /* arrays over any type selected so far */
421
- " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
422
- " UNION ALL "
423
- /* composite types containing any type selected so far */
424
- " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
425
- " WHERE t.typtype = 'c' AND "
426
- " t.oid = c.reltype AND "
427
- " c.oid = a.attrelid AND "
428
- " NOT a.attisdropped AND "
429
- " a.atttypid = x.oid "
430
- " UNION ALL "
431
- /* ranges containing any type selected so far */
432
- " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
433
- " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
434
- " ) foo "
435
- ") "
436
- /* now look for stored columns of any such type */
437
- "SELECT n.nspname, c.relname, a.attname "
438
- "FROM pg_catalog.pg_class c, "
439
- " pg_catalog.pg_namespace n, "
440
- " pg_catalog.pg_attribute a "
441
- "WHERE c.oid = a.attrelid AND "
442
- " NOT a.attisdropped AND "
443
- " a.atttypid IN (SELECT oid FROM oids) AND "
444
- " c.relkind IN ("
445
- CppAsString2 (RELKIND_RELATION ) ", "
446
- CppAsString2 (RELKIND_MATVIEW ) ", "
447
- CppAsString2 (RELKIND_INDEX ) ") AND "
448
- " c.relnamespace = n.oid AND "
449
- /* exclude possible orphaned temp tables */
450
- " n.nspname !~ '^pg_temp_' AND "
451
- " n.nspname !~ '^pg_toast_temp_' AND "
452
- /* exclude system catalogs, too */
453
- " n.nspname NOT IN ('pg_catalog', 'information_schema')" ,
454
- cur_check -> base_query );
455
-
456
- ntups = PQntuples (res );
514
+ if (!check -> version_hook (cluster ))
515
+ continue ;
516
+ }
517
+ else if (check -> threshold_version != ALL_VERSIONS )
518
+ {
519
+ if (GET_MAJOR_VERSION (cluster -> major_version ) > check -> threshold_version )
520
+ continue ;
521
+ }
522
+ else
523
+ Assert (check -> threshold_version == ALL_VERSIONS );
457
524
458
- /*
459
- * The datatype was found, so extract the data and log to the
460
- * requested filename. We need to open the file for appending
461
- * since the check might have already found the type in another
462
- * database earlier in the loop.
463
- */
464
- if (ntups )
465
- {
466
- /*
467
- * Make sure we have a buffer to save reports to now that we
468
- * found a first failing check.
469
- */
470
- if (!found )
471
- initPQExpBuffer (& report );
472
- found = true;
473
-
474
- /*
475
- * If this is the first time we see an error for the check in
476
- * question then print a status message of the failure.
477
- */
478
- if (!results [checknum ])
479
- {
480
- pg_log (PG_REPORT , " failed check: %s" , _ (cur_check -> status ));
481
- appendPQExpBuffer (& report , "\n%s\n%s %s\n" ,
482
- _ (cur_check -> report_text ),
483
- _ ("A list of the problem columns is in the file:" ),
484
- output_path );
485
- }
486
- results [checknum ] = true;
487
-
488
- i_nspname = PQfnumber (res , "nspname" );
489
- i_relname = PQfnumber (res , "relname" );
490
- i_attname = PQfnumber (res , "attname" );
491
-
492
- for (int rowno = 0 ; rowno < ntups ; rowno ++ )
493
- {
494
- if (script == NULL && (script = fopen_priv (output_path , "a" )) == NULL )
495
- pg_fatal ("could not open file \"%s\": %m" , output_path );
496
-
497
- if (!db_used )
498
- {
499
- fprintf (script , "In database: %s\n" , active_db -> db_name );
500
- db_used = true;
501
- }
502
- fprintf (script , " %s.%s.%s\n" ,
503
- PQgetvalue (res , rowno , i_nspname ),
504
- PQgetvalue (res , rowno , i_relname ),
505
- PQgetvalue (res , rowno , i_attname ));
506
- }
507
-
508
- if (script )
509
- {
510
- fclose (script );
511
- script = NULL ;
512
- }
513
- }
525
+ queries [i ] = data_type_check_query (i );
514
526
515
- PQclear (res );
516
- }
527
+ states [i ].check = check ;
528
+ states [i ].result = & results [i ];
529
+ states [i ].report = & report ;
517
530
518
- PQfinish (conn );
531
+ upgrade_task_add_step (task , queries [i ], process_data_type_check ,
532
+ true, & states [i ]);
519
533
}
520
534
521
- if (found )
522
- pg_fatal ("Data type checks failed: %s" , report .data );
535
+ /*
536
+ * Connect to each database in the cluster and run all defined checks
537
+ * against that database before trying the next one.
538
+ */
539
+ upgrade_task_run (task , cluster );
540
+ upgrade_task_free (task );
541
+
542
+ if (report )
543
+ {
544
+ pg_fatal ("Data type checks failed: %s" , report -> data );
545
+ destroyPQExpBuffer (report );
546
+ }
523
547
524
548
pg_free (results );
549
+ for (int i = 0 ; i < n_data_types_usage_checks ; i ++ )
550
+ {
551
+ if (queries [i ])
552
+ pg_free (queries [i ]);
553
+ }
554
+ pg_free (queries );
555
+ pg_free (states );
525
556
526
557
check_ok ();
527
558
}
@@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
616
647
check_old_cluster_subscription_state ();
617
648
}
618
649
619
- check_for_data_types_usage (& old_cluster , data_types_usage_checks );
650
+ check_for_data_types_usage (& old_cluster );
620
651
621
652
/*
622
653
* PG 14 changed the function signature of encoding conversion functions.
0 commit comments