Skip to content

Commit 7baa36d

Browse files
pg_upgrade: Parallelize subscription check.
This commit makes use of the new task framework in pg_upgrade to parallelize the part of check_old_cluster_subscription_state() that verifies each of the subscribed tables is in the 'i' (initialize) or 'r' (ready) state. This check will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
1 parent 6d3d2e8 commit 7baa36d

File tree

1 file changed

+111
-95
lines changed

1 file changed

+111
-95
lines changed

src/bin/pg_upgrade/check.c

Lines changed: 111 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
19051905
check_ok();
19061906
}
19071907

1908+
/*
1909+
* Callback function for processing results of query for
1910+
* check_old_cluster_subscription_state()'s UpgradeTask. If the query returned
1911+
* any rows (i.e., the check failed), write the details to the report file.
1912+
*/
1913+
static void
1914+
process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg)
1915+
{
1916+
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
1917+
int ntup = PQntuples(res);
1918+
int i_srsubstate = PQfnumber(res, "srsubstate");
1919+
int i_subname = PQfnumber(res, "subname");
1920+
int i_nspname = PQfnumber(res, "nspname");
1921+
int i_relname = PQfnumber(res, "relname");
1922+
1923+
AssertVariableIsOfType(&process_old_sub_state_check, UpgradeTaskProcessCB);
1924+
1925+
for (int i = 0; i < ntup; i++)
1926+
{
1927+
if (report->file == NULL &&
1928+
(report->file = fopen_priv(report->path, "w")) == NULL)
1929+
pg_fatal("could not open file \"%s\": %m", report->path);
1930+
1931+
fprintf(report->file, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
1932+
PQgetvalue(res, i, i_srsubstate),
1933+
dbinfo->db_name,
1934+
PQgetvalue(res, i, i_subname),
1935+
PQgetvalue(res, i, i_nspname),
1936+
PQgetvalue(res, i, i_relname));
1937+
}
1938+
}
1939+
19081940
/*
19091941
* check_old_cluster_subscription_state()
19101942
*
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
19151947
static void
19161948
check_old_cluster_subscription_state(void)
19171949
{
1918-
FILE *script = NULL;
1919-
char output_path[MAXPGPATH];
1950+
UpgradeTask *task = upgrade_task_create();
1951+
UpgradeTaskReport report;
1952+
const char *query;
1953+
PGresult *res;
1954+
PGconn *conn;
19201955
int ntup;
19211956

19221957
prep_status("Checking for subscription state");
19231958

1924-
snprintf(output_path, sizeof(output_path), "%s/%s",
1959+
report.file = NULL;
1960+
snprintf(report.path, sizeof(report.path), "%s/%s",
19251961
log_opts.basedir,
19261962
"subs_invalid.txt");
1927-
for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
1928-
{
1929-
PGresult *res;
1930-
DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
1931-
PGconn *conn = connectToServer(&old_cluster, active_db->db_name);
1932-
1933-
/* We need to check for pg_replication_origin only once. */
1934-
if (dbnum == 0)
1935-
{
1936-
/*
1937-
* Check that all the subscriptions have their respective
1938-
* replication origin.
1939-
*/
1940-
res = executeQueryOrDie(conn,
1941-
"SELECT d.datname, s.subname "
1942-
"FROM pg_catalog.pg_subscription s "
1943-
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1944-
" ON o.roname = 'pg_' || s.oid "
1945-
"INNER JOIN pg_catalog.pg_database d "
1946-
" ON d.oid = s.subdbid "
1947-
"WHERE o.roname IS NULL;");
1948-
1949-
ntup = PQntuples(res);
1950-
for (int i = 0; i < ntup; i++)
1951-
{
1952-
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
1953-
pg_fatal("could not open file \"%s\": %m", output_path);
1954-
fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
1955-
PQgetvalue(res, i, 0),
1956-
PQgetvalue(res, i, 1));
1957-
}
1958-
PQclear(res);
1959-
}
1960-
1961-
/*
1962-
* We don't allow upgrade if there is a risk of dangling slot or
1963-
* origin corresponding to initial sync after upgrade.
1964-
*
1965-
* A slot/origin not created yet refers to the 'i' (initialize) state,
1966-
* while 'r' (ready) state refers to a slot/origin created previously
1967-
* but already dropped. These states are supported for pg_upgrade. The
1968-
* other states listed below are not supported:
1969-
*
1970-
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
1971-
* would retain a replication slot, which could not be dropped by the
1972-
* sync worker spawned after the upgrade because the subscription ID
1973-
* used for the slot name won't match anymore.
1974-
*
1975-
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
1976-
* would retain the replication origin when there is a failure in
1977-
* tablesync worker immediately after dropping the replication slot in
1978-
* the publisher.
1979-
*
1980-
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
1981-
* a relation upgraded while in this state would expect an origin ID
1982-
* with the OID of the subscription used before the upgrade, causing
1983-
* it to fail.
1984-
*
1985-
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
1986-
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
1987-
* so we need not allow these states.
1988-
*/
1989-
res = executeQueryOrDie(conn,
1990-
"SELECT r.srsubstate, s.subname, n.nspname, c.relname "
1991-
"FROM pg_catalog.pg_subscription_rel r "
1992-
"LEFT JOIN pg_catalog.pg_subscription s"
1993-
" ON r.srsubid = s.oid "
1994-
"LEFT JOIN pg_catalog.pg_class c"
1995-
" ON r.srrelid = c.oid "
1996-
"LEFT JOIN pg_catalog.pg_namespace n"
1997-
" ON c.relnamespace = n.oid "
1998-
"WHERE r.srsubstate NOT IN ('i', 'r') "
1999-
"ORDER BY s.subname");
2000-
2001-
ntup = PQntuples(res);
2002-
for (int i = 0; i < ntup; i++)
2003-
{
2004-
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
2005-
pg_fatal("could not open file \"%s\": %m", output_path);
2006-
2007-
fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
2008-
PQgetvalue(res, i, 0),
2009-
active_db->db_name,
2010-
PQgetvalue(res, i, 1),
2011-
PQgetvalue(res, i, 2),
2012-
PQgetvalue(res, i, 3));
2013-
}
20141963

2015-
PQclear(res);
2016-
PQfinish(conn);
1964+
/*
1965+
* Check that all the subscriptions have their respective replication
1966+
* origin. This check only needs to run once.
1967+
*/
1968+
conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
1969+
res = executeQueryOrDie(conn,
1970+
"SELECT d.datname, s.subname "
1971+
"FROM pg_catalog.pg_subscription s "
1972+
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1973+
" ON o.roname = 'pg_' || s.oid "
1974+
"INNER JOIN pg_catalog.pg_database d "
1975+
" ON d.oid = s.subdbid "
1976+
"WHERE o.roname IS NULL;");
1977+
ntup = PQntuples(res);
1978+
for (int i = 0; i < ntup; i++)
1979+
{
1980+
if (report.file == NULL &&
1981+
(report.file = fopen_priv(report.path, "w")) == NULL)
1982+
pg_fatal("could not open file \"%s\": %m", report.path);
1983+
fprintf(report.file, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
1984+
PQgetvalue(res, i, 0),
1985+
PQgetvalue(res, i, 1));
20171986
}
1987+
PQclear(res);
1988+
PQfinish(conn);
20181989

2019-
if (script)
1990+
/*
1991+
* We don't allow upgrade if there is a risk of dangling slot or origin
1992+
* corresponding to initial sync after upgrade.
1993+
*
1994+
* A slot/origin not created yet refers to the 'i' (initialize) state,
1995+
* while 'r' (ready) state refers to a slot/origin created previously but
1996+
* already dropped. These states are supported for pg_upgrade. The other
1997+
* states listed below are not supported:
1998+
*
1999+
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
2000+
* retain a replication slot, which could not be dropped by the sync
2001+
* worker spawned after the upgrade because the subscription ID used for
2002+
* the slot name won't match anymore.
2003+
*
2004+
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
2005+
* retain the replication origin when there is a failure in tablesync
2006+
* worker immediately after dropping the replication slot in the
2007+
* publisher.
2008+
*
2009+
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
2010+
* relation upgraded while in this state would expect an origin ID with
2011+
* the OID of the subscription used before the upgrade, causing it to
2012+
* fail.
2013+
*
2014+
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
2015+
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we
2016+
* need not allow these states.
2017+
*/
2018+
query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
2019+
"FROM pg_catalog.pg_subscription_rel r "
2020+
"LEFT JOIN pg_catalog.pg_subscription s"
2021+
" ON r.srsubid = s.oid "
2022+
"LEFT JOIN pg_catalog.pg_class c"
2023+
" ON r.srrelid = c.oid "
2024+
"LEFT JOIN pg_catalog.pg_namespace n"
2025+
" ON c.relnamespace = n.oid "
2026+
"WHERE r.srsubstate NOT IN ('i', 'r') "
2027+
"ORDER BY s.subname";
2028+
2029+
upgrade_task_add_step(task, query, process_old_sub_state_check,
2030+
true, &report);
2031+
2032+
upgrade_task_run(task, &old_cluster);
2033+
upgrade_task_free(task);
2034+
2035+
if (report.file)
20202036
{
2021-
fclose(script);
2037+
fclose(report.file);
20222038
pg_log(PG_REPORT, "fatal");
20232039
pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
20242040
"You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
20252041
"A list of the problematic subscriptions is in the file:\n"
2026-
" %s", output_path);
2042+
" %s", report.path);
20272043
}
20282044
else
20292045
check_ok();

0 commit comments

Comments
 (0)