@@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1440
1440
case REORDER_BUFFER_CHANGE_DELETE :
1441
1441
if (!relentry -> pubactions .pubdelete )
1442
1442
return ;
1443
+
1444
+ /*
1445
+ * This is only possible if deletes are allowed even when replica
1446
+ * identity is not defined for a table. Since the DELETE action
1447
+ * can't be published, we simply return.
1448
+ */
1449
+ if (!change -> data .tp .oldtuple )
1450
+ {
1451
+ elog (DEBUG1 , "didn't send DELETE change because of missing oldtuple" );
1452
+ return ;
1453
+ }
1443
1454
break ;
1444
1455
default :
1445
1456
Assert (false);
@@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1448
1459
/* Avoid leaking memory by using and resetting our own context */
1449
1460
old = MemoryContextSwitchTo (data -> context );
1450
1461
1451
- /* Send the data */
1452
- switch ( action )
1462
+ /* Switch relation if publishing via root. */
1463
+ if ( relentry -> publish_as_relid != RelationGetRelid ( relation ) )
1453
1464
{
1454
- case REORDER_BUFFER_CHANGE_INSERT :
1455
- new_slot = relentry -> new_slot ;
1456
- ExecStoreHeapTuple (& change -> data .tp .newtuple -> tuple ,
1457
- new_slot , false);
1458
-
1459
- /* Switch relation if publishing via root. */
1460
- if (relentry -> publish_as_relid != RelationGetRelid (relation ))
1461
- {
1462
- Assert (relation -> rd_rel -> relispartition );
1463
- ancestor = RelationIdGetRelation (relentry -> publish_as_relid );
1464
- targetrel = ancestor ;
1465
- /* Convert tuple if needed. */
1466
- if (relentry -> attrmap )
1467
- {
1468
- TupleDesc tupdesc = RelationGetDescr (targetrel );
1469
-
1470
- new_slot = execute_attr_map_slot (relentry -> attrmap ,
1471
- new_slot ,
1472
- MakeTupleTableSlot (tupdesc , & TTSOpsVirtual ));
1473
- }
1474
- }
1475
-
1476
- /* Check row filter */
1477
- if (!pgoutput_row_filter (targetrel , NULL , & new_slot , relentry ,
1478
- & action ))
1479
- break ;
1480
-
1481
- /*
1482
- * Send BEGIN if we haven't yet.
1483
- *
1484
- * We send the BEGIN message after ensuring that we will actually
1485
- * send the change. This avoids sending a pair of BEGIN/COMMIT
1486
- * messages for empty transactions.
1487
- */
1488
- if (txndata && !txndata -> sent_begin_txn )
1489
- pgoutput_send_begin (ctx , txn );
1490
-
1491
- /*
1492
- * Schema should be sent using the original relation because it
1493
- * also sends the ancestor's relation.
1494
- */
1495
- maybe_send_schema (ctx , change , relation , relentry );
1465
+ Assert (relation -> rd_rel -> relispartition );
1466
+ ancestor = RelationIdGetRelation (relentry -> publish_as_relid );
1467
+ targetrel = ancestor ;
1468
+ }
1496
1469
1497
- OutputPluginPrepareWrite (ctx , true);
1498
- logicalrep_write_insert (ctx -> out , xid , targetrel , new_slot ,
1499
- data -> binary , relentry -> columns );
1500
- OutputPluginWrite (ctx , true);
1501
- break ;
1502
- case REORDER_BUFFER_CHANGE_UPDATE :
1503
- if (change -> data .tp .oldtuple )
1504
- {
1505
- old_slot = relentry -> old_slot ;
1506
- ExecStoreHeapTuple (& change -> data .tp .oldtuple -> tuple ,
1507
- old_slot , false);
1508
- }
1470
+ if (change -> data .tp .oldtuple )
1471
+ {
1472
+ old_slot = relentry -> old_slot ;
1473
+ ExecStoreHeapTuple (& change -> data .tp .oldtuple -> tuple , old_slot , false);
1509
1474
1510
- new_slot = relentry -> new_slot ;
1511
- ExecStoreHeapTuple (& change -> data .tp .newtuple -> tuple ,
1512
- new_slot , false);
1475
+ /* Convert tuple if needed. */
1476
+ if (relentry -> attrmap )
1477
+ {
1478
+ TupleTableSlot * slot = MakeTupleTableSlot (RelationGetDescr (targetrel ),
1479
+ & TTSOpsVirtual );
1513
1480
1514
- /* Switch relation if publishing via root. */
1515
- if (relentry -> publish_as_relid != RelationGetRelid (relation ))
1516
- {
1517
- Assert (relation -> rd_rel -> relispartition );
1518
- ancestor = RelationIdGetRelation (relentry -> publish_as_relid );
1519
- targetrel = ancestor ;
1520
- /* Convert tuples if needed. */
1521
- if (relentry -> attrmap )
1522
- {
1523
- TupleDesc tupdesc = RelationGetDescr (targetrel );
1481
+ old_slot = execute_attr_map_slot (relentry -> attrmap , old_slot , slot );
1482
+ }
1483
+ }
1524
1484
1525
- if (old_slot )
1526
- old_slot = execute_attr_map_slot ( relentry -> attrmap ,
1527
- old_slot ,
1528
- MakeTupleTableSlot ( tupdesc , & TTSOpsVirtual ) );
1485
+ if (change -> data . tp . newtuple )
1486
+ {
1487
+ new_slot = relentry -> new_slot ;
1488
+ ExecStoreHeapTuple ( & change -> data . tp . newtuple -> tuple , new_slot , false );
1529
1489
1530
- new_slot = execute_attr_map_slot ( relentry -> attrmap ,
1531
- new_slot ,
1532
- MakeTupleTableSlot ( tupdesc , & TTSOpsVirtual ));
1533
- }
1534
- }
1490
+ /* Convert tuple if needed. */
1491
+ if ( relentry -> attrmap )
1492
+ {
1493
+ TupleTableSlot * slot = MakeTupleTableSlot ( RelationGetDescr ( targetrel ),
1494
+ & TTSOpsVirtual );
1535
1495
1536
- /* Check row filter */
1537
- if (!pgoutput_row_filter (targetrel , old_slot , & new_slot ,
1538
- relentry , & action ))
1539
- break ;
1496
+ new_slot = execute_attr_map_slot (relentry -> attrmap , new_slot , slot );
1497
+ }
1498
+ }
1540
1499
1541
- /* Send BEGIN if we haven't yet */
1542
- if (txndata && !txndata -> sent_begin_txn )
1543
- pgoutput_send_begin (ctx , txn );
1500
+ /*
1501
+ * Check row filter.
1502
+ *
1503
+ * Updates could be transformed to inserts or deletes based on the results
1504
+ * of the row filter for old and new tuple.
1505
+ */
1506
+ if (!pgoutput_row_filter (targetrel , old_slot , & new_slot , relentry , & action ))
1507
+ goto cleanup ;
1544
1508
1545
- maybe_send_schema (ctx , change , relation , relentry );
1509
+ /*
1510
+ * Send BEGIN if we haven't yet.
1511
+ *
1512
+ * We send the BEGIN message after ensuring that we will actually send the
1513
+ * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514
+ * transactions.
1515
+ */
1516
+ if (txndata && !txndata -> sent_begin_txn )
1517
+ pgoutput_send_begin (ctx , txn );
1546
1518
1547
- OutputPluginPrepareWrite (ctx , true);
1519
+ /*
1520
+ * Schema should be sent using the original relation because it also sends
1521
+ * the ancestor's relation.
1522
+ */
1523
+ maybe_send_schema (ctx , change , relation , relentry );
1548
1524
1549
- /*
1550
- * Updates could be transformed to inserts or deletes based on the
1551
- * results of the row filter for old and new tuple.
1552
- */
1553
- switch (action )
1554
- {
1555
- case REORDER_BUFFER_CHANGE_INSERT :
1556
- logicalrep_write_insert (ctx -> out , xid , targetrel ,
1557
- new_slot , data -> binary ,
1558
- relentry -> columns );
1559
- break ;
1560
- case REORDER_BUFFER_CHANGE_UPDATE :
1561
- logicalrep_write_update (ctx -> out , xid , targetrel ,
1562
- old_slot , new_slot , data -> binary ,
1563
- relentry -> columns );
1564
- break ;
1565
- case REORDER_BUFFER_CHANGE_DELETE :
1566
- logicalrep_write_delete (ctx -> out , xid , targetrel ,
1567
- old_slot , data -> binary ,
1568
- relentry -> columns );
1569
- break ;
1570
- default :
1571
- Assert (false);
1572
- }
1525
+ OutputPluginPrepareWrite (ctx , true);
1573
1526
1574
- OutputPluginWrite (ctx , true);
1527
+ /* Send the data */
1528
+ switch (action )
1529
+ {
1530
+ case REORDER_BUFFER_CHANGE_INSERT :
1531
+ logicalrep_write_insert (ctx -> out , xid , targetrel , new_slot ,
1532
+ data -> binary , relentry -> columns );
1533
+ break ;
1534
+ case REORDER_BUFFER_CHANGE_UPDATE :
1535
+ logicalrep_write_update (ctx -> out , xid , targetrel , old_slot ,
1536
+ new_slot , data -> binary , relentry -> columns );
1575
1537
break ;
1576
1538
case REORDER_BUFFER_CHANGE_DELETE :
1577
- if (change -> data .tp .oldtuple )
1578
- {
1579
- old_slot = relentry -> old_slot ;
1580
-
1581
- ExecStoreHeapTuple (& change -> data .tp .oldtuple -> tuple ,
1582
- old_slot , false);
1583
-
1584
- /* Switch relation if publishing via root. */
1585
- if (relentry -> publish_as_relid != RelationGetRelid (relation ))
1586
- {
1587
- Assert (relation -> rd_rel -> relispartition );
1588
- ancestor = RelationIdGetRelation (relentry -> publish_as_relid );
1589
- targetrel = ancestor ;
1590
- /* Convert tuple if needed. */
1591
- if (relentry -> attrmap )
1592
- {
1593
- TupleDesc tupdesc = RelationGetDescr (targetrel );
1594
-
1595
- old_slot = execute_attr_map_slot (relentry -> attrmap ,
1596
- old_slot ,
1597
- MakeTupleTableSlot (tupdesc , & TTSOpsVirtual ));
1598
- }
1599
- }
1600
-
1601
- /* Check row filter */
1602
- if (!pgoutput_row_filter (targetrel , old_slot , & new_slot ,
1603
- relentry , & action ))
1604
- break ;
1605
-
1606
- /* Send BEGIN if we haven't yet */
1607
- if (txndata && !txndata -> sent_begin_txn )
1608
- pgoutput_send_begin (ctx , txn );
1609
-
1610
- maybe_send_schema (ctx , change , relation , relentry );
1611
-
1612
- OutputPluginPrepareWrite (ctx , true);
1613
- logicalrep_write_delete (ctx -> out , xid , targetrel ,
1614
- old_slot , data -> binary ,
1615
- relentry -> columns );
1616
- OutputPluginWrite (ctx , true);
1617
- }
1618
- else
1619
- elog (DEBUG1 , "didn't send DELETE change because of missing oldtuple" );
1539
+ logicalrep_write_delete (ctx -> out , xid , targetrel , old_slot ,
1540
+ data -> binary , relentry -> columns );
1620
1541
break ;
1621
1542
default :
1622
1543
Assert (false);
1623
1544
}
1624
1545
1546
+ OutputPluginWrite (ctx , true);
1547
+
1548
+ cleanup :
1625
1549
if (RelationIsValid (ancestor ))
1626
1550
{
1627
1551
RelationClose (ancestor );
1628
1552
ancestor = NULL ;
1629
1553
}
1630
1554
1631
- /* Cleanup */
1632
1555
MemoryContextSwitchTo (old );
1633
1556
MemoryContextReset (data -> context );
1634
1557
}
0 commit comments