@@ -48,9 +48,6 @@ typedef struct
48
48
* table. */
49
49
} published_rel ;
50
50
51
- static void publication_translate_columns (Relation targetrel , List * columns ,
52
- int * natts , AttrNumber * * attrs );
53
-
54
51
/*
55
52
* Check if relation can be in given publication and throws appropriate
56
53
* error if not.
@@ -351,6 +348,33 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level
351
348
return topmost_relid ;
352
349
}
353
350
351
+ /*
352
+ * attnumstoint2vector
353
+ * Convert a Bitmapset of AttrNumbers into an int2vector.
354
+ *
355
+ * AttrNumber numbers are 0-based, i.e., not offset by
356
+ * FirstLowInvalidHeapAttributeNumber.
357
+ */
358
+ static int2vector *
359
+ attnumstoint2vector (Bitmapset * attrs )
360
+ {
361
+ int2vector * result ;
362
+ int n = bms_num_members (attrs );
363
+ int i = -1 ;
364
+ int j = 0 ;
365
+
366
+ result = buildint2vector (NULL , n );
367
+
368
+ while ((i = bms_next_member (attrs , i )) >= 0 )
369
+ {
370
+ Assert (i <= PG_INT16_MAX );
371
+
372
+ result -> values [j ++ ] = (int16 ) i ;
373
+ }
374
+
375
+ return result ;
376
+ }
377
+
354
378
/*
355
379
* Insert new publication / relation mapping.
356
380
*/
@@ -365,12 +389,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
365
389
Relation targetrel = pri -> relation ;
366
390
Oid relid = RelationGetRelid (targetrel );
367
391
Oid pubreloid ;
392
+ Bitmapset * attnums ;
368
393
Publication * pub = GetPublication (pubid );
369
- AttrNumber * attarray = NULL ;
370
- int natts = 0 ;
371
394
ObjectAddress myself ,
372
395
referenced ;
373
396
List * relids = NIL ;
397
+ int i ;
374
398
375
399
rel = table_open (PublicationRelRelationId , RowExclusiveLock );
376
400
@@ -395,13 +419,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
395
419
396
420
check_publication_add_relation (targetrel );
397
421
398
- /*
399
- * Translate column names to attnums and make sure the column list
400
- * contains only allowed elements (no system or generated columns etc.).
401
- * Also build an array of attnums, for storing in the catalog.
402
- */
403
- publication_translate_columns (pri -> relation , pri -> columns ,
404
- & natts , & attarray );
422
+ /* Validate and translate column names into a Bitmapset of attnums. */
423
+ attnums = pub_collist_validate (pri -> relation , pri -> columns );
405
424
406
425
/* Form a tuple. */
407
426
memset (values , 0 , sizeof (values ));
@@ -423,7 +442,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
423
442
424
443
/* Add column list, if available */
425
444
if (pri -> columns )
426
- values [Anum_pg_publication_rel_prattrs - 1 ] = PointerGetDatum (buildint2vector ( attarray , natts ));
445
+ values [Anum_pg_publication_rel_prattrs - 1 ] = PointerGetDatum (attnumstoint2vector ( attnums ));
427
446
else
428
447
nulls [Anum_pg_publication_rel_prattrs - 1 ] = true;
429
448
@@ -451,9 +470,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
451
470
false);
452
471
453
472
/* Add dependency on the columns, if any are listed */
454
- for (int i = 0 ; i < natts ; i ++ )
473
+ i = -1 ;
474
+ while ((i = bms_next_member (attnums , i )) >= 0 )
455
475
{
456
- ObjectAddressSubSet (referenced , RelationRelationId , relid , attarray [ i ] );
476
+ ObjectAddressSubSet (referenced , RelationRelationId , relid , i );
457
477
recordDependencyOn (& myself , & referenced , DEPENDENCY_NORMAL );
458
478
}
459
479
@@ -476,47 +496,23 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
476
496
return myself ;
477
497
}
478
498
479
- /* qsort comparator for attnums */
480
- static int
481
- compare_int16 (const void * a , const void * b )
482
- {
483
- int av = * (const int16 * ) a ;
484
- int bv = * (const int16 * ) b ;
485
-
486
- /* this can't overflow if int is wider than int16 */
487
- return (av - bv );
488
- }
489
-
490
499
/*
491
- * Translate a list of column names to an array of attribute numbers
492
- * and a Bitmapset with them; verify that each attribute is appropriate
493
- * to have in a publication column list (no system or generated attributes,
494
- * no duplicates). Additional checks with replica identity are done later;
495
- * see pub_collist_contains_invalid_column .
500
+ * pub_collist_validate
501
+ * Process and validate the 'columns' list and ensure the columns are all
502
+ * valid to use for a publication. Checks for and raises an ERROR for
503
+ * any; unknown columns, system columns, duplicate columns or generated
504
+ * columns .
496
505
*
497
- * Note that the attribute numbers are *not* offset by
498
- * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
499
- * is okay.
506
+ * Looks up each column's attnum and returns a 0-based Bitmapset of the
507
+ * corresponding attnums.
500
508
*/
501
- static void
502
- publication_translate_columns (Relation targetrel , List * columns ,
503
- int * natts , AttrNumber * * attrs )
509
+ Bitmapset *
510
+ pub_collist_validate (Relation targetrel , List * columns )
504
511
{
505
- AttrNumber * attarray = NULL ;
506
512
Bitmapset * set = NULL ;
507
513
ListCell * lc ;
508
- int n = 0 ;
509
514
TupleDesc tupdesc = RelationGetDescr (targetrel );
510
515
511
- /* Bail out when no column list defined. */
512
- if (!columns )
513
- return ;
514
-
515
- /*
516
- * Translate list of columns to attnums. We prohibit system attributes and
517
- * make sure there are no duplicate columns.
518
- */
519
- attarray = palloc (sizeof (AttrNumber ) * list_length (columns ));
520
516
foreach (lc , columns )
521
517
{
522
518
char * colname = strVal (lfirst (lc ));
@@ -547,16 +543,9 @@ publication_translate_columns(Relation targetrel, List *columns,
547
543
colname ));
548
544
549
545
set = bms_add_member (set , attnum );
550
- attarray [n ++ ] = attnum ;
551
546
}
552
547
553
- /* Be tidy, so that the catalog representation is always sorted */
554
- qsort (attarray , n , sizeof (AttrNumber ), compare_int16 );
555
-
556
- * natts = n ;
557
- * attrs = attarray ;
558
-
559
- bms_free (set );
548
+ return set ;
560
549
}
561
550
562
551
/*
@@ -569,19 +558,12 @@ publication_translate_columns(Relation targetrel, List *columns,
569
558
Bitmapset *
570
559
pub_collist_to_bitmapset (Bitmapset * columns , Datum pubcols , MemoryContext mcxt )
571
560
{
572
- Bitmapset * result = NULL ;
561
+ Bitmapset * result = columns ;
573
562
ArrayType * arr ;
574
563
int nelems ;
575
564
int16 * elems ;
576
565
MemoryContext oldcxt = NULL ;
577
566
578
- /*
579
- * If an existing bitmap was provided, use it. Otherwise just use NULL and
580
- * build a new bitmap.
581
- */
582
- if (columns )
583
- result = columns ;
584
-
585
567
arr = DatumGetArrayTypeP (pubcols );
586
568
nelems = ARR_DIMS (arr )[0 ];
587
569
elems = (int16 * ) ARR_DATA_PTR (arr );
0 commit comments