Skip to content

Commit 923def9

Browse files
committed
Allow specifying column lists for logical replication
This allows specifying an optional column list when adding a table to logical replication. The column list may be specified after the table name, enclosed in parentheses. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. For UPDATE/DELETE publications, the column list needs to cover all REPLICA IDENTITY columns. For INSERT publications, the column list is arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if the table uses REPLICA IDENTITY FULL, column list is not allowed. The column list can contain only simple column references. Complex expressions, function calls etc. are not allowed. This restriction could be relaxed in the future. During the initial table synchronization, only columns included in the column list are copied to the subscriber. If the subscription has several publications, containing the same table with different column lists, columns specified in any of the lists will be copied. This means all columns are replicated if the table has no column list at all (which is treated as column list with all columns), or when of the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that matches the schema of the table). For partitioned tables, publish_via_partition_root determines whether the column list for the root or the leaf relation will be used. If the parameter is 'false' (the default), the list defined for the leaf relation is used. Otherwise, the column list for the root partition will be used. Psql commands \dRp+ and \d <table-name> now display any column lists. Author: Tomas Vondra, Alvaro Herrera, Rahila Syed Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed, Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
1 parent 05843b1 commit 923def9

File tree

26 files changed

+2833
-92
lines changed

26 files changed

+2833
-92
lines changed

doc/src/sgml/catalogs.sgml

+14-1
Original file line numberDiff line numberDiff line change
@@ -4410,7 +4410,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
44104410
</para>
44114411
<para>
44124412
This is an array of <structfield>indnatts</structfield> values that
4413-
indicate which table columns this index indexes. For example a value
4413+
indicate which table columns this index indexes. For example, a value
44144414
of <literal>1 3</literal> would mean that the first and the third table
44154415
columns make up the index entries. Key columns come before non-key
44164416
(included) columns. A zero in this array indicates that the
@@ -6291,6 +6291,19 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
62916291
Reference to schema
62926292
</para></entry>
62936293
</row>
6294+
6295+
<row>
6296+
<entry role="catalog_table_entry"><para role="column_definition">
6297+
<structfield>prattrs</structfield> <type>int2vector</type>
6298+
(references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attnum</structfield>)
6299+
</para>
6300+
<para>
6301+
This is an array of values that indicates which table columns are
6302+
part of the publication. For example, a value of <literal>1 3</literal>
6303+
would mean that the first and the third table columns are published.
6304+
A null value indicates that all columns are published.
6305+
</para></entry>
6306+
</row>
62946307
</tbody>
62956308
</tgroup>
62966309
</table>

doc/src/sgml/protocol.sgml

+2-1
Original file line numberDiff line numberDiff line change
@@ -7016,7 +7016,8 @@ Relation
70167016
</listitem>
70177017
</varlistentry>
70187018
</variablelist>
7019-
Next, the following message part appears for each column (except generated columns):
7019+
Next, the following message part appears for each column included in
7020+
the publication (except generated columns):
70207021
<variablelist>
70217022
<varlistentry>
70227023
<term>

doc/src/sgml/ref/alter_publication.sgml

+16-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
3030

3131
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
3232

33-
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
33+
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
3434
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ... ]
3535
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
3636
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@@ -114,6 +114,14 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
114114
specified, the table and all its descendant tables (if any) are
115115
affected. Optionally, <literal>*</literal> can be specified after the table
116116
name to explicitly indicate that descendant tables are included.
117+
</para>
118+
119+
<para>
120+
Optionally, a column list can be specified. See <xref
121+
linkend="sql-createpublication"/> for details.
122+
</para>
123+
124+
<para>
117125
If the optional <literal>WHERE</literal> clause is specified, rows for
118126
which the <replaceable class="parameter">expression</replaceable>
119127
evaluates to false or null will not be published. Note that parentheses
@@ -185,7 +193,13 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
185193
<para>
186194
Add some tables to the publication:
187195
<programlisting>
188-
ALTER PUBLICATION mypublication ADD TABLE users, departments;
196+
ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
197+
</programlisting></para>
198+
199+
<para>
200+
Change the set of columns published for a table:
201+
<programlisting>
202+
ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments;
189203
</programlisting></para>
190204

191205
<para>

doc/src/sgml/ref/create_publication.sgml

+16-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
3333

3434
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
3535

36-
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
36+
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
3737
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
3838
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
3939
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@@ -93,6 +93,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
9393
<literal>TRUNCATE</literal> commands.
9494
</para>
9595

96+
<para>
97+
When a column list is specified, only the named columns are replicated.
98+
If no column list is specified, all columns of the table are replicated
99+
through this publication, including any columns added later. If a column
100+
list is specified, it must include the replica identity columns.
101+
</para>
102+
96103
<para>
97104
Only persistent base tables and partitioned tables can be part of a
98105
publication. Temporary tables, unlogged tables, foreign tables,
@@ -348,6 +355,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL
348355
<structname>sales</structname>:
349356
<programlisting>
350357
CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales;
358+
</programlisting></para>
359+
360+
<para>
361+
Create a publication that publishes all changes for table <structname>users</structname>,
362+
but replicates only columns <structname>user_id</structname> and
363+
<structname>firstname</structname>:
364+
<programlisting>
365+
CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname);
351366
</programlisting></para>
352367
</refsect1>
353368

src/backend/catalog/pg_publication.c

+145
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
#include "utils/rel.h"
4646
#include "utils/syscache.h"
4747

48+
static void publication_translate_columns(Relation targetrel, List *columns,
49+
int *natts, AttrNumber **attrs);
50+
4851
/*
4952
* Check if relation can be in given publication and throws appropriate
5053
* error if not.
@@ -395,6 +398,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
395398
Oid relid = RelationGetRelid(targetrel);
396399
Oid pubreloid;
397400
Publication *pub = GetPublication(pubid);
401+
AttrNumber *attarray = NULL;
402+
int natts = 0;
398403
ObjectAddress myself,
399404
referenced;
400405
List *relids = NIL;
@@ -422,6 +427,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
422427

423428
check_publication_add_relation(targetrel);
424429

430+
/*
431+
* Translate column names to attnums and make sure the column list contains
432+
* only allowed elements (no system or generated columns etc.). Also build
433+
* an array of attnums, for storing in the catalog.
434+
*/
435+
publication_translate_columns(pri->relation, pri->columns,
436+
&natts, &attarray);
437+
425438
/* Form a tuple. */
426439
memset(values, 0, sizeof(values));
427440
memset(nulls, false, sizeof(nulls));
@@ -440,6 +453,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
440453
else
441454
nulls[Anum_pg_publication_rel_prqual - 1] = true;
442455

456+
/* Add column list, if available */
457+
if (pri->columns)
458+
values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
459+
else
460+
nulls[Anum_pg_publication_rel_prattrs - 1] = true;
461+
443462
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
444463

445464
/* Insert tuple into catalog. */
@@ -463,6 +482,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
463482
DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
464483
false);
465484

485+
/* Add dependency on the columns, if any are listed */
486+
for (int i = 0; i < natts; i++)
487+
{
488+
ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
489+
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
490+
}
491+
466492
/* Close the table. */
467493
table_close(rel, RowExclusiveLock);
468494

@@ -482,6 +508,125 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
482508
return myself;
483509
}
484510

511+
/* qsort comparator for attnums */
512+
static int
513+
compare_int16(const void *a, const void *b)
514+
{
515+
int av = *(const int16 *) a;
516+
int bv = *(const int16 *) b;
517+
518+
/* this can't overflow if int is wider than int16 */
519+
return (av - bv);
520+
}
521+
522+
/*
523+
* Translate a list of column names to an array of attribute numbers
524+
* and a Bitmapset with them; verify that each attribute is appropriate
525+
* to have in a publication column list (no system or generated attributes,
526+
* no duplicates). Additional checks with replica identity are done later;
527+
* see check_publication_columns.
528+
*
529+
* Note that the attribute numbers are *not* offset by
530+
* FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
531+
* is okay.
532+
*/
533+
static void
534+
publication_translate_columns(Relation targetrel, List *columns,
535+
int *natts, AttrNumber **attrs)
536+
{
537+
AttrNumber *attarray = NULL;
538+
Bitmapset *set = NULL;
539+
ListCell *lc;
540+
int n = 0;
541+
TupleDesc tupdesc = RelationGetDescr(targetrel);
542+
543+
/* Bail out when no column list defined. */
544+
if (!columns)
545+
return;
546+
547+
/*
548+
* Translate list of columns to attnums. We prohibit system attributes and
549+
* make sure there are no duplicate columns.
550+
*/
551+
attarray = palloc(sizeof(AttrNumber) * list_length(columns));
552+
foreach(lc, columns)
553+
{
554+
char *colname = strVal(lfirst(lc));
555+
AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
556+
557+
if (attnum == InvalidAttrNumber)
558+
ereport(ERROR,
559+
errcode(ERRCODE_UNDEFINED_COLUMN),
560+
errmsg("column \"%s\" of relation \"%s\" does not exist",
561+
colname, RelationGetRelationName(targetrel)));
562+
563+
if (!AttrNumberIsForUserDefinedAttr(attnum))
564+
ereport(ERROR,
565+
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
566+
errmsg("cannot reference system column \"%s\" in publication column list",
567+
colname));
568+
569+
if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
570+
ereport(ERROR,
571+
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
572+
errmsg("cannot reference generated column \"%s\" in publication column list",
573+
colname));
574+
575+
if (bms_is_member(attnum, set))
576+
ereport(ERROR,
577+
errcode(ERRCODE_DUPLICATE_OBJECT),
578+
errmsg("duplicate column \"%s\" in publication column list",
579+
colname));
580+
581+
set = bms_add_member(set, attnum);
582+
attarray[n++] = attnum;
583+
}
584+
585+
/* Be tidy, so that the catalog representation is always sorted */
586+
qsort(attarray, n, sizeof(AttrNumber), compare_int16);
587+
588+
*natts = n;
589+
*attrs = attarray;
590+
591+
bms_free(set);
592+
}
593+
594+
/*
595+
* Transform the column list (represented by an array) to a bitmapset.
596+
*/
597+
Bitmapset *
598+
pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
599+
{
600+
Bitmapset *result = NULL;
601+
ArrayType *arr;
602+
int nelems;
603+
int16 *elems;
604+
MemoryContext oldcxt;
605+
606+
/*
607+
* If an existing bitmap was provided, use it. Otherwise just use NULL
608+
* and build a new bitmap.
609+
*/
610+
if (columns)
611+
result = columns;
612+
613+
arr = DatumGetArrayTypeP(pubcols);
614+
nelems = ARR_DIMS(arr)[0];
615+
elems = (int16 *) ARR_DATA_PTR(arr);
616+
617+
/* If a memory context was specified, switch to it. */
618+
if (mcxt)
619+
oldcxt = MemoryContextSwitchTo(mcxt);
620+
621+
for (int i = 0; i < nelems; i++)
622+
result = bms_add_member(result, elems[i]);
623+
624+
if (mcxt)
625+
MemoryContextSwitchTo(oldcxt);
626+
627+
return result;
628+
}
629+
485630
/*
486631
* Insert new publication / schema mapping.
487632
*/

0 commit comments

Comments
 (0)