Skip to content

Commit 0ff2028

Browse files
author
Amit Kapila
committed
Extend pg_publication_tables to display column list and row filter.
Commit 923def9 and 52e4f0c allowed to specify column lists and row filters for publication tables. This commit extends the pg_publication_tables view and pg_get_publication_tables function to display that information. This information will be useful to users and we also need this for the later commit that prohibits combining multiple publications with different column lists for the same table. Author: Hou Zhijie Reviewed By: Amit Kapila, Alvaro Herrera, Shi Yu, Takamichi Osumi Discussion: https://postgr.es/m/202204251548.mudq7jbqnh7r@alvherre.pgsql
1 parent 62221ef commit 0ff2028

File tree

8 files changed

+126
-44
lines changed

8 files changed

+126
-44
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9691,7 +9691,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
96919691

96929692
<row>
96939693
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
9694-
<entry>publications and their associated tables</entry>
9694+
<entry>publications and information of their associated tables</entry>
96959695
</row>
96969696

96979697
<row>
@@ -11635,8 +11635,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
1163511635

1163611636
<para>
1163711637
The view <structname>pg_publication_tables</structname> provides
11638-
information about the mapping between publications and the tables they
11639-
contain. Unlike the underlying catalog
11638+
information about the mapping between publications and information of
11639+
tables they contain. Unlike the underlying catalog
1164011640
<link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
1164111641
this view expands publications defined as <literal>FOR ALL TABLES</literal>
1164211642
and <literal>FOR ALL TABLES IN SCHEMA</literal>, so for such publications
@@ -11687,6 +11687,27 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
1168711687
Name of table
1168811688
</para></entry>
1168911689
</row>
11690+
11691+
<row>
11692+
<entry role="catalog_table_entry"><para role="column_definition">
11693+
<structfield>attnames</structfield> <type>name[]</type>
11694+
(references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attname</structfield>)
11695+
</para>
11696+
<para>
11697+
Names of table columns included in the publication. This contains all
11698+
the columns of the table when the user didn't specify the column list
11699+
for the table.
11700+
</para></entry>
11701+
</row>
11702+
11703+
<row>
11704+
<entry role="catalog_table_entry"><para role="column_definition">
11705+
<structfield>rowfilter</structfield> <type>text</type>
11706+
</para>
11707+
<para>
11708+
Expression for the table's publication qualifying condition
11709+
</para></entry>
11710+
</row>
1169011711
</tbody>
1169111712
</tgroup>
1169211713
</table>

src/backend/catalog/pg_publication.c

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,11 +1077,12 @@ get_publication_name(Oid pubid, bool missing_ok)
10771077
}
10781078

10791079
/*
1080-
* Returns Oids of tables in a publication.
1080+
* Returns information of tables in a publication.
10811081
*/
10821082
Datum
10831083
pg_get_publication_tables(PG_FUNCTION_ARGS)
10841084
{
1085+
#define NUM_PUBLICATOIN_TABLES_ELEM 3
10851086
FuncCallContext *funcctx;
10861087
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
10871088
Publication *publication;
@@ -1090,6 +1091,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
10901091
/* stuff done only on the first call of the function */
10911092
if (SRF_IS_FIRSTCALL())
10921093
{
1094+
TupleDesc tupdesc;
10931095
MemoryContext oldcontext;
10941096

10951097
/* create a function context for cross-call persistence */
@@ -1136,6 +1138,16 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11361138
tables = filter_partitions(tables);
11371139
}
11381140

1141+
/* Construct a tuple descriptor for the result rows. */
1142+
tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATOIN_TABLES_ELEM);
1143+
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
1144+
OIDOID, -1, 0);
1145+
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
1146+
INT2VECTOROID, -1, 0);
1147+
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
1148+
PG_NODE_TREEOID, -1, 0);
1149+
1150+
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
11391151
funcctx->user_fctx = (void *) tables;
11401152

11411153
MemoryContextSwitchTo(oldcontext);
@@ -1147,9 +1159,47 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
11471159

11481160
if (funcctx->call_cntr < list_length(tables))
11491161
{
1162+
HeapTuple pubtuple = NULL;
1163+
HeapTuple rettuple;
11501164
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
1165+
Datum values[NUM_PUBLICATOIN_TABLES_ELEM];
1166+
bool nulls[NUM_PUBLICATOIN_TABLES_ELEM];
1167+
1168+
/*
1169+
* Form tuple with appropriate data.
1170+
*/
1171+
MemSet(nulls, 0, sizeof(nulls));
1172+
MemSet(values, 0, sizeof(values));
1173+
1174+
publication = GetPublicationByName(pubname, false);
1175+
1176+
values[0] = ObjectIdGetDatum(relid);
1177+
1178+
pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1179+
ObjectIdGetDatum(relid),
1180+
ObjectIdGetDatum(publication->oid));
1181+
1182+
if (HeapTupleIsValid(pubtuple))
1183+
{
1184+
/* Lookup the column list attribute. */
1185+
values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1186+
Anum_pg_publication_rel_prattrs,
1187+
&(nulls[1]));
1188+
1189+
/* Null indicates no filter. */
1190+
values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1191+
Anum_pg_publication_rel_prqual,
1192+
&(nulls[2]));
1193+
}
1194+
else
1195+
{
1196+
nulls[1] = true;
1197+
nulls[2] = true;
1198+
}
1199+
1200+
rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
11511201

1152-
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1202+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
11531203
}
11541204

11551205
SRF_RETURN_DONE(funcctx);

src/backend/catalog/system_views.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,15 @@ CREATE VIEW pg_publication_tables AS
368368
SELECT
369369
P.pubname AS pubname,
370370
N.nspname AS schemaname,
371-
C.relname AS tablename
371+
C.relname AS tablename,
372+
( SELECT array_agg(a.attname ORDER BY a.attnum)
373+
FROM unnest(CASE WHEN GPT.attrs IS NOT NULL THEN GPT.attrs
374+
ELSE (SELECT array_agg(g) FROM generate_series(1, C.relnatts) g)
375+
END) k
376+
JOIN pg_attribute a
377+
ON (a.attrelid = GPT.relid AND a.attnum = k)
378+
) AS attnames,
379+
pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
372380
FROM pg_publication P,
373381
LATERAL pg_get_publication_tables(P.pubname) GPT,
374382
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)

src/backend/replication/logical/tablesync.c

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -795,15 +795,12 @@ fetch_remote_table_info(char *nspname, char *relname,
795795
resetStringInfo(&cmd);
796796
appendStringInfo(&cmd,
797797
"SELECT DISTINCT unnest"
798-
" FROM pg_publication p"
799-
" LEFT OUTER JOIN pg_publication_rel pr"
800-
" ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
801-
" LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
798+
" FROM pg_publication p,"
802799
" LATERAL pg_get_publication_tables(p.pubname) gpt"
800+
" LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
803801
" WHERE gpt.relid = %u"
804802
" AND p.pubname IN ( %s )",
805803
lrel->remoteid,
806-
lrel->remoteid,
807804
pub_names.data);
808805

809806
pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
@@ -965,15 +962,12 @@ fetch_remote_table_info(char *nspname, char *relname,
965962
/* Check for row filters. */
966963
resetStringInfo(&cmd);
967964
appendStringInfo(&cmd,
968-
"SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
969-
" FROM pg_publication p"
970-
" LEFT OUTER JOIN pg_publication_rel pr"
971-
" ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
965+
"SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
966+
" FROM pg_publication p,"
972967
" LATERAL pg_get_publication_tables(p.pubname) gpt"
973968
" WHERE gpt.relid = %u"
974969
" AND p.pubname IN ( %s )",
975970
lrel->remoteid,
976-
lrel->remoteid,
977971
pub_names.data);
978972

979973
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 202205131
56+
#define CATALOG_VERSION_NO 202205191
5757

5858
#endif

src/include/catalog/pg_proc.dat

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11673,11 +11673,11 @@
1167311673
prosrc => 'pg_show_replication_origin_status' },
1167411674

1167511675
# publications
11676-
{ oid => '6119', descr => 'get OIDs of tables in a publication',
11676+
{ oid => '6119', descr => 'get information of tables in a publication',
1167711677
proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
11678-
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
11679-
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
11680-
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
11678+
provolatile => 's', prorettype => 'record', proargtypes => 'text',
11679+
proallargtypes => '{text,oid,int2vector,pg_node_tree}', proargmodes => '{i,o,o,o}',
11680+
proargnames => '{pubname,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' },
1168111681
{ oid => '6121',
1168211682
descr => 'returns whether a relation can be part of a publication',
1168311683
proname => 'pg_relation_is_publishable', provolatile => 's',

src/test/regress/expected/publication.out

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,52 +1585,52 @@ CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10);
15851585
-- Schema publication that does not include the schema that has the parent table
15861586
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
15871587
SELECT * FROM pg_publication_tables;
1588-
pubname | schemaname | tablename
1589-
---------+------------+------------
1590-
pub | sch2 | tbl1_part1
1588+
pubname | schemaname | tablename | attnames | rowfilter
1589+
---------+------------+------------+----------+-----------
1590+
pub | sch2 | tbl1_part1 | {a} |
15911591
(1 row)
15921592

15931593
DROP PUBLICATION pub;
15941594
-- Table publication that does not include the parent table
15951595
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
15961596
SELECT * FROM pg_publication_tables;
1597-
pubname | schemaname | tablename
1598-
---------+------------+------------
1599-
pub | sch2 | tbl1_part1
1597+
pubname | schemaname | tablename | attnames | rowfilter
1598+
---------+------------+------------+----------+-----------
1599+
pub | sch2 | tbl1_part1 | {a} |
16001600
(1 row)
16011601

16021602
-- Table publication that includes both the parent table and the child table
16031603
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
16041604
SELECT * FROM pg_publication_tables;
1605-
pubname | schemaname | tablename
1606-
---------+------------+-----------
1607-
pub | sch1 | tbl1
1605+
pubname | schemaname | tablename | attnames | rowfilter
1606+
---------+------------+-----------+----------+-----------
1607+
pub | sch1 | tbl1 | {a} |
16081608
(1 row)
16091609

16101610
DROP PUBLICATION pub;
16111611
-- Schema publication that does not include the schema that has the parent table
16121612
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
16131613
SELECT * FROM pg_publication_tables;
1614-
pubname | schemaname | tablename
1615-
---------+------------+------------
1616-
pub | sch2 | tbl1_part1
1614+
pubname | schemaname | tablename | attnames | rowfilter
1615+
---------+------------+------------+----------+-----------
1616+
pub | sch2 | tbl1_part1 | {a} |
16171617
(1 row)
16181618

16191619
DROP PUBLICATION pub;
16201620
-- Table publication that does not include the parent table
16211621
CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
16221622
SELECT * FROM pg_publication_tables;
1623-
pubname | schemaname | tablename
1624-
---------+------------+------------
1625-
pub | sch2 | tbl1_part1
1623+
pubname | schemaname | tablename | attnames | rowfilter
1624+
---------+------------+------------+----------+-----------
1625+
pub | sch2 | tbl1_part1 | {a} |
16261626
(1 row)
16271627

16281628
-- Table publication that includes both the parent table and the child table
16291629
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
16301630
SELECT * FROM pg_publication_tables;
1631-
pubname | schemaname | tablename
1632-
---------+------------+------------
1633-
pub | sch2 | tbl1_part1
1631+
pubname | schemaname | tablename | attnames | rowfilter
1632+
---------+------------+------------+----------+-----------
1633+
pub | sch2 | tbl1_part1 | {a} |
16341634
(1 row)
16351635

16361636
DROP PUBLICATION pub;
@@ -1643,9 +1643,9 @@ CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a);
16431643
ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30);
16441644
CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
16451645
SELECT * FROM pg_publication_tables;
1646-
pubname | schemaname | tablename
1647-
---------+------------+-----------
1648-
pub | sch1 | tbl1
1646+
pubname | schemaname | tablename | attnames | rowfilter
1647+
---------+------------+-----------+----------+-----------
1648+
pub | sch1 | tbl1 | {a} |
16491649
(1 row)
16501650

16511651
RESET client_min_messages;

src/test/regress/expected/rules.out

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,9 +1437,18 @@ pg_prepared_xacts| SELECT p.transaction,
14371437
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
14381438
pg_publication_tables| SELECT p.pubname,
14391439
n.nspname AS schemaname,
1440-
c.relname AS tablename
1440+
c.relname AS tablename,
1441+
( SELECT array_agg(a.attname ORDER BY a.attnum) AS array_agg
1442+
FROM (unnest(
1443+
CASE
1444+
WHEN (gpt.attrs IS NOT NULL) THEN (gpt.attrs)::integer[]
1445+
ELSE ( SELECT array_agg(g.g) AS array_agg
1446+
FROM generate_series(1, (c.relnatts)::integer) g(g))
1447+
END) k(k)
1448+
JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
1449+
pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
14411450
FROM pg_publication p,
1442-
LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid),
1451+
LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
14431452
(pg_class c
14441453
JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
14451454
WHERE (c.oid = gpt.relid);

0 commit comments

Comments
 (0)