@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
1905
1905
check_ok ();
1906
1906
}
1907
1907
1908
+ /*
1909
+ * Callback function for processing results of query for
1910
+ * check_old_cluster_subscription_state()'s UpgradeTask. If the query returned
1911
+ * any rows (i.e., the check failed), write the details to the report file.
1912
+ */
1913
+ static void
1914
+ process_old_sub_state_check (DbInfo * dbinfo , PGresult * res , void * arg )
1915
+ {
1916
+ UpgradeTaskReport * report = (UpgradeTaskReport * ) arg ;
1917
+ int ntup = PQntuples (res );
1918
+ int i_srsubstate = PQfnumber (res , "srsubstate" );
1919
+ int i_subname = PQfnumber (res , "subname" );
1920
+ int i_nspname = PQfnumber (res , "nspname" );
1921
+ int i_relname = PQfnumber (res , "relname" );
1922
+
1923
+ AssertVariableIsOfType (& process_old_sub_state_check , UpgradeTaskProcessCB );
1924
+
1925
+ for (int i = 0 ; i < ntup ; i ++ )
1926
+ {
1927
+ if (report -> file == NULL &&
1928
+ (report -> file = fopen_priv (report -> path , "w" )) == NULL )
1929
+ pg_fatal ("could not open file \"%s\": %m" , report -> path );
1930
+
1931
+ fprintf (report -> file , "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n" ,
1932
+ PQgetvalue (res , i , i_srsubstate ),
1933
+ dbinfo -> db_name ,
1934
+ PQgetvalue (res , i , i_subname ),
1935
+ PQgetvalue (res , i , i_nspname ),
1936
+ PQgetvalue (res , i , i_relname ));
1937
+ }
1938
+ }
1939
+
1908
1940
/*
1909
1941
* check_old_cluster_subscription_state()
1910
1942
*
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
1915
1947
static void
1916
1948
check_old_cluster_subscription_state (void )
1917
1949
{
1918
- FILE * script = NULL ;
1919
- char output_path [MAXPGPATH ];
1950
+ UpgradeTask * task = upgrade_task_create ();
1951
+ UpgradeTaskReport report ;
1952
+ const char * query ;
1953
+ PGresult * res ;
1954
+ PGconn * conn ;
1920
1955
int ntup ;
1921
1956
1922
1957
prep_status ("Checking for subscription state" );
1923
1958
1924
- snprintf (output_path , sizeof (output_path ), "%s/%s" ,
1959
+ report .file = NULL ;
1960
+ snprintf (report .path , sizeof (report .path ), "%s/%s" ,
1925
1961
log_opts .basedir ,
1926
1962
"subs_invalid.txt" );
1927
- for (int dbnum = 0 ; dbnum < old_cluster .dbarr .ndbs ; dbnum ++ )
1928
- {
1929
- PGresult * res ;
1930
- DbInfo * active_db = & old_cluster .dbarr .dbs [dbnum ];
1931
- PGconn * conn = connectToServer (& old_cluster , active_db -> db_name );
1932
-
1933
- /* We need to check for pg_replication_origin only once. */
1934
- if (dbnum == 0 )
1935
- {
1936
- /*
1937
- * Check that all the subscriptions have their respective
1938
- * replication origin.
1939
- */
1940
- res = executeQueryOrDie (conn ,
1941
- "SELECT d.datname, s.subname "
1942
- "FROM pg_catalog.pg_subscription s "
1943
- "LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1944
- " ON o.roname = 'pg_' || s.oid "
1945
- "INNER JOIN pg_catalog.pg_database d "
1946
- " ON d.oid = s.subdbid "
1947
- "WHERE o.roname IS NULL;" );
1948
-
1949
- ntup = PQntuples (res );
1950
- for (int i = 0 ; i < ntup ; i ++ )
1951
- {
1952
- if (script == NULL && (script = fopen_priv (output_path , "w" )) == NULL )
1953
- pg_fatal ("could not open file \"%s\": %m" , output_path );
1954
- fprintf (script , "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n" ,
1955
- PQgetvalue (res , i , 0 ),
1956
- PQgetvalue (res , i , 1 ));
1957
- }
1958
- PQclear (res );
1959
- }
1960
-
1961
- /*
1962
- * We don't allow upgrade if there is a risk of dangling slot or
1963
- * origin corresponding to initial sync after upgrade.
1964
- *
1965
- * A slot/origin not created yet refers to the 'i' (initialize) state,
1966
- * while 'r' (ready) state refers to a slot/origin created previously
1967
- * but already dropped. These states are supported for pg_upgrade. The
1968
- * other states listed below are not supported:
1969
- *
1970
- * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
1971
- * would retain a replication slot, which could not be dropped by the
1972
- * sync worker spawned after the upgrade because the subscription ID
1973
- * used for the slot name won't match anymore.
1974
- *
1975
- * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
1976
- * would retain the replication origin when there is a failure in
1977
- * tablesync worker immediately after dropping the replication slot in
1978
- * the publisher.
1979
- *
1980
- * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
1981
- * a relation upgraded while in this state would expect an origin ID
1982
- * with the OID of the subscription used before the upgrade, causing
1983
- * it to fail.
1984
- *
1985
- * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
1986
- * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
1987
- * so we need not allow these states.
1988
- */
1989
- res = executeQueryOrDie (conn ,
1990
- "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
1991
- "FROM pg_catalog.pg_subscription_rel r "
1992
- "LEFT JOIN pg_catalog.pg_subscription s"
1993
- " ON r.srsubid = s.oid "
1994
- "LEFT JOIN pg_catalog.pg_class c"
1995
- " ON r.srrelid = c.oid "
1996
- "LEFT JOIN pg_catalog.pg_namespace n"
1997
- " ON c.relnamespace = n.oid "
1998
- "WHERE r.srsubstate NOT IN ('i', 'r') "
1999
- "ORDER BY s.subname" );
2000
-
2001
- ntup = PQntuples (res );
2002
- for (int i = 0 ; i < ntup ; i ++ )
2003
- {
2004
- if (script == NULL && (script = fopen_priv (output_path , "w" )) == NULL )
2005
- pg_fatal ("could not open file \"%s\": %m" , output_path );
2006
-
2007
- fprintf (script , "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n" ,
2008
- PQgetvalue (res , i , 0 ),
2009
- active_db -> db_name ,
2010
- PQgetvalue (res , i , 1 ),
2011
- PQgetvalue (res , i , 2 ),
2012
- PQgetvalue (res , i , 3 ));
2013
- }
2014
1963
2015
- PQclear (res );
2016
- PQfinish (conn );
1964
+ /*
1965
+ * Check that all the subscriptions have their respective replication
1966
+ * origin. This check only needs to run once.
1967
+ */
1968
+ conn = connectToServer (& old_cluster , old_cluster .dbarr .dbs [0 ].db_name );
1969
+ res = executeQueryOrDie (conn ,
1970
+ "SELECT d.datname, s.subname "
1971
+ "FROM pg_catalog.pg_subscription s "
1972
+ "LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1973
+ " ON o.roname = 'pg_' || s.oid "
1974
+ "INNER JOIN pg_catalog.pg_database d "
1975
+ " ON d.oid = s.subdbid "
1976
+ "WHERE o.roname IS NULL;" );
1977
+ ntup = PQntuples (res );
1978
+ for (int i = 0 ; i < ntup ; i ++ )
1979
+ {
1980
+ if (report .file == NULL &&
1981
+ (report .file = fopen_priv (report .path , "w" )) == NULL )
1982
+ pg_fatal ("could not open file \"%s\": %m" , report .path );
1983
+ fprintf (report .file , "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n" ,
1984
+ PQgetvalue (res , i , 0 ),
1985
+ PQgetvalue (res , i , 1 ));
2017
1986
}
1987
+ PQclear (res );
1988
+ PQfinish (conn );
2018
1989
2019
- if (script )
1990
+ /*
1991
+ * We don't allow upgrade if there is a risk of dangling slot or origin
1992
+ * corresponding to initial sync after upgrade.
1993
+ *
1994
+ * A slot/origin not created yet refers to the 'i' (initialize) state,
1995
+ * while 'r' (ready) state refers to a slot/origin created previously but
1996
+ * already dropped. These states are supported for pg_upgrade. The other
1997
+ * states listed below are not supported:
1998
+ *
1999
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
2000
+ * retain a replication slot, which could not be dropped by the sync
2001
+ * worker spawned after the upgrade because the subscription ID used for
2002
+ * the slot name won't match anymore.
2003
+ *
2004
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
2005
+ * retain the replication origin when there is a failure in tablesync
2006
+ * worker immediately after dropping the replication slot in the
2007
+ * publisher.
2008
+ *
2009
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
2010
+ * relation upgraded while in this state would expect an origin ID with
2011
+ * the OID of the subscription used before the upgrade, causing it to
2012
+ * fail.
2013
+ *
2014
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
2015
+ * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we
2016
+ * need not allow these states.
2017
+ */
2018
+ query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
2019
+ "FROM pg_catalog.pg_subscription_rel r "
2020
+ "LEFT JOIN pg_catalog.pg_subscription s"
2021
+ " ON r.srsubid = s.oid "
2022
+ "LEFT JOIN pg_catalog.pg_class c"
2023
+ " ON r.srrelid = c.oid "
2024
+ "LEFT JOIN pg_catalog.pg_namespace n"
2025
+ " ON c.relnamespace = n.oid "
2026
+ "WHERE r.srsubstate NOT IN ('i', 'r') "
2027
+ "ORDER BY s.subname" ;
2028
+
2029
+ upgrade_task_add_step (task , query , process_old_sub_state_check ,
2030
+ true, & report );
2031
+
2032
+ upgrade_task_run (task , & old_cluster );
2033
+ upgrade_task_free (task );
2034
+
2035
+ if (report .file )
2020
2036
{
2021
- fclose (script );
2037
+ fclose (report . file );
2022
2038
pg_log (PG_REPORT , "fatal" );
2023
2039
pg_fatal ("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
2024
2040
"You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
2025
2041
"A list of the problematic subscriptions is in the file:\n"
2026
- " %s" , output_path );
2042
+ " %s" , report . path );
2027
2043
}
2028
2044
else
2029
2045
check_ok ();
0 commit comments