Skip to content

Commit da324d6

Browse files
author
Amit Kapila
committed
Refactor pgoutput_change().
Instead of mostly-duplicate code for different operation (insert/update/delete) types, write a common code to compute old/new tuples, and check the row filter. Author: Hou Zhijie Reviewed-by: Peter Smith, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716194A47FFA8D91133687D94BF9@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 902ecd3 commit da324d6

File tree

1 file changed

+79
-156
lines changed

1 file changed

+79
-156
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 79 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14401440
case REORDER_BUFFER_CHANGE_DELETE:
14411441
if (!relentry->pubactions.pubdelete)
14421442
return;
1443+
1444+
/*
1445+
* This is only possible if deletes are allowed even when replica
1446+
* identity is not defined for a table. Since the DELETE action
1447+
* can't be published, we simply return.
1448+
*/
1449+
if (!change->data.tp.oldtuple)
1450+
{
1451+
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1452+
return;
1453+
}
14431454
break;
14441455
default:
14451456
Assert(false);
@@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14481459
/* Avoid leaking memory by using and resetting our own context */
14491460
old = MemoryContextSwitchTo(data->context);
14501461

1451-
/* Send the data */
1452-
switch (action)
1462+
/* Switch relation if publishing via root. */
1463+
if (relentry->publish_as_relid != RelationGetRelid(relation))
14531464
{
1454-
case REORDER_BUFFER_CHANGE_INSERT:
1455-
new_slot = relentry->new_slot;
1456-
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1457-
new_slot, false);
1458-
1459-
/* Switch relation if publishing via root. */
1460-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1461-
{
1462-
Assert(relation->rd_rel->relispartition);
1463-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1464-
targetrel = ancestor;
1465-
/* Convert tuple if needed. */
1466-
if (relentry->attrmap)
1467-
{
1468-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1469-
1470-
new_slot = execute_attr_map_slot(relentry->attrmap,
1471-
new_slot,
1472-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1473-
}
1474-
}
1475-
1476-
/* Check row filter */
1477-
if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
1478-
&action))
1479-
break;
1480-
1481-
/*
1482-
* Send BEGIN if we haven't yet.
1483-
*
1484-
* We send the BEGIN message after ensuring that we will actually
1485-
* send the change. This avoids sending a pair of BEGIN/COMMIT
1486-
* messages for empty transactions.
1487-
*/
1488-
if (txndata && !txndata->sent_begin_txn)
1489-
pgoutput_send_begin(ctx, txn);
1490-
1491-
/*
1492-
* Schema should be sent using the original relation because it
1493-
* also sends the ancestor's relation.
1494-
*/
1495-
maybe_send_schema(ctx, change, relation, relentry);
1465+
Assert(relation->rd_rel->relispartition);
1466+
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1467+
targetrel = ancestor;
1468+
}
14961469

1497-
OutputPluginPrepareWrite(ctx, true);
1498-
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1499-
data->binary, relentry->columns);
1500-
OutputPluginWrite(ctx, true);
1501-
break;
1502-
case REORDER_BUFFER_CHANGE_UPDATE:
1503-
if (change->data.tp.oldtuple)
1504-
{
1505-
old_slot = relentry->old_slot;
1506-
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1507-
old_slot, false);
1508-
}
1470+
if (change->data.tp.oldtuple)
1471+
{
1472+
old_slot = relentry->old_slot;
1473+
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
15091474

1510-
new_slot = relentry->new_slot;
1511-
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1512-
new_slot, false);
1475+
/* Convert tuple if needed. */
1476+
if (relentry->attrmap)
1477+
{
1478+
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1479+
&TTSOpsVirtual);
15131480

1514-
/* Switch relation if publishing via root. */
1515-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1516-
{
1517-
Assert(relation->rd_rel->relispartition);
1518-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1519-
targetrel = ancestor;
1520-
/* Convert tuples if needed. */
1521-
if (relentry->attrmap)
1522-
{
1523-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1481+
old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1482+
}
1483+
}
15241484

1525-
if (old_slot)
1526-
old_slot = execute_attr_map_slot(relentry->attrmap,
1527-
old_slot,
1528-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1485+
if (change->data.tp.newtuple)
1486+
{
1487+
new_slot = relentry->new_slot;
1488+
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
15291489

1530-
new_slot = execute_attr_map_slot(relentry->attrmap,
1531-
new_slot,
1532-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1533-
}
1534-
}
1490+
/* Convert tuple if needed. */
1491+
if (relentry->attrmap)
1492+
{
1493+
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1494+
&TTSOpsVirtual);
15351495

1536-
/* Check row filter */
1537-
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1538-
relentry, &action))
1539-
break;
1496+
new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1497+
}
1498+
}
15401499

1541-
/* Send BEGIN if we haven't yet */
1542-
if (txndata && !txndata->sent_begin_txn)
1543-
pgoutput_send_begin(ctx, txn);
1500+
/*
1501+
* Check row filter.
1502+
*
1503+
* Updates could be transformed to inserts or deletes based on the results
1504+
* of the row filter for old and new tuple.
1505+
*/
1506+
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1507+
goto cleanup;
15441508

1545-
maybe_send_schema(ctx, change, relation, relentry);
1509+
/*
1510+
* Send BEGIN if we haven't yet.
1511+
*
1512+
* We send the BEGIN message after ensuring that we will actually send the
1513+
* change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514+
* transactions.
1515+
*/
1516+
if (txndata && !txndata->sent_begin_txn)
1517+
pgoutput_send_begin(ctx, txn);
15461518

1547-
OutputPluginPrepareWrite(ctx, true);
1519+
/*
1520+
* Schema should be sent using the original relation because it also sends
1521+
* the ancestor's relation.
1522+
*/
1523+
maybe_send_schema(ctx, change, relation, relentry);
15481524

1549-
/*
1550-
* Updates could be transformed to inserts or deletes based on the
1551-
* results of the row filter for old and new tuple.
1552-
*/
1553-
switch (action)
1554-
{
1555-
case REORDER_BUFFER_CHANGE_INSERT:
1556-
logicalrep_write_insert(ctx->out, xid, targetrel,
1557-
new_slot, data->binary,
1558-
relentry->columns);
1559-
break;
1560-
case REORDER_BUFFER_CHANGE_UPDATE:
1561-
logicalrep_write_update(ctx->out, xid, targetrel,
1562-
old_slot, new_slot, data->binary,
1563-
relentry->columns);
1564-
break;
1565-
case REORDER_BUFFER_CHANGE_DELETE:
1566-
logicalrep_write_delete(ctx->out, xid, targetrel,
1567-
old_slot, data->binary,
1568-
relentry->columns);
1569-
break;
1570-
default:
1571-
Assert(false);
1572-
}
1525+
OutputPluginPrepareWrite(ctx, true);
15731526

1574-
OutputPluginWrite(ctx, true);
1527+
/* Send the data */
1528+
switch (action)
1529+
{
1530+
case REORDER_BUFFER_CHANGE_INSERT:
1531+
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1532+
data->binary, relentry->columns);
1533+
break;
1534+
case REORDER_BUFFER_CHANGE_UPDATE:
1535+
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1536+
new_slot, data->binary, relentry->columns);
15751537
break;
15761538
case REORDER_BUFFER_CHANGE_DELETE:
1577-
if (change->data.tp.oldtuple)
1578-
{
1579-
old_slot = relentry->old_slot;
1580-
1581-
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1582-
old_slot, false);
1583-
1584-
/* Switch relation if publishing via root. */
1585-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1586-
{
1587-
Assert(relation->rd_rel->relispartition);
1588-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1589-
targetrel = ancestor;
1590-
/* Convert tuple if needed. */
1591-
if (relentry->attrmap)
1592-
{
1593-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1594-
1595-
old_slot = execute_attr_map_slot(relentry->attrmap,
1596-
old_slot,
1597-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1598-
}
1599-
}
1600-
1601-
/* Check row filter */
1602-
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1603-
relentry, &action))
1604-
break;
1605-
1606-
/* Send BEGIN if we haven't yet */
1607-
if (txndata && !txndata->sent_begin_txn)
1608-
pgoutput_send_begin(ctx, txn);
1609-
1610-
maybe_send_schema(ctx, change, relation, relentry);
1611-
1612-
OutputPluginPrepareWrite(ctx, true);
1613-
logicalrep_write_delete(ctx->out, xid, targetrel,
1614-
old_slot, data->binary,
1615-
relentry->columns);
1616-
OutputPluginWrite(ctx, true);
1617-
}
1618-
else
1619-
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1539+
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1540+
data->binary, relentry->columns);
16201541
break;
16211542
default:
16221543
Assert(false);
16231544
}
16241545

1546+
OutputPluginWrite(ctx, true);
1547+
1548+
cleanup:
16251549
if (RelationIsValid(ancestor))
16261550
{
16271551
RelationClose(ancestor);
16281552
ancestor = NULL;
16291553
}
16301554

1631-
/* Cleanup */
16321555
MemoryContextSwitchTo(old);
16331556
MemoryContextReset(data->context);
16341557
}

0 commit comments

Comments
 (0)