Skip to content

Commit 45fdc97

Browse files
author
Amit Kapila
committed
Extend the logical decoding output plugin API with stream methods.
This adds seven methods to the output plugin API, adding support for streaming changes of large in-progress transactions. * stream_start * stream_stop * stream_abort * stream_commit * stream_change * stream_message * stream_truncate Most of this is a simple extension of the existing methods, with the semantic difference that the transaction (or subtransaction) is incomplete and may be aborted later (which is something the regular API does not really need to deal with). This also extends the 'test_decoding' plugin, implementing these new stream methods. The stream_start/start_stop are used to demarcate a chunk of changes streamed for a particular toplevel transaction. This commit simply adds these new APIs and the upcoming patch to "allow the streaming mode in ReorderBuffer" will use these APIs. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent 1383874 commit 45fdc97

File tree

6 files changed

+878
-0
lines changed

6 files changed

+878
-0
lines changed

contrib/test_decoding/test_decoding.c

+176
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6262
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
6363
bool transactional, const char *prefix,
6464
Size sz, const char *message);
65+
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
66+
ReorderBufferTXN *txn);
67+
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
68+
ReorderBufferTXN *txn);
69+
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
70+
ReorderBufferTXN *txn,
71+
XLogRecPtr abort_lsn);
72+
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
73+
ReorderBufferTXN *txn,
74+
XLogRecPtr commit_lsn);
75+
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
76+
ReorderBufferTXN *txn,
77+
Relation relation,
78+
ReorderBufferChange *change);
79+
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
80+
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
81+
bool transactional, const char *prefix,
82+
Size sz, const char *message);
83+
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
84+
ReorderBufferTXN *txn,
85+
int nrelations, Relation relations[],
86+
ReorderBufferChange *change);
6587

6688
void
6789
_PG_init(void)
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
83105
cb->filter_by_origin_cb = pg_decode_filter;
84106
cb->shutdown_cb = pg_decode_shutdown;
85107
cb->message_cb = pg_decode_message;
108+
cb->stream_start_cb = pg_decode_stream_start;
109+
cb->stream_stop_cb = pg_decode_stream_stop;
110+
cb->stream_abort_cb = pg_decode_stream_abort;
111+
cb->stream_commit_cb = pg_decode_stream_commit;
112+
cb->stream_change_cb = pg_decode_stream_change;
113+
cb->stream_message_cb = pg_decode_stream_message;
114+
cb->stream_truncate_cb = pg_decode_stream_truncate;
86115
}
87116

88117

@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
540569
appendBinaryStringInfo(ctx->out, message, sz);
541570
OutputPluginWrite(ctx, true);
542571
}
572+
573+
/*
574+
* We never try to stream any empty xact so we don't need any special handling
575+
* for skip_empty_xacts in streaming mode APIs.
576+
*/
577+
static void
578+
pg_decode_stream_start(LogicalDecodingContext *ctx,
579+
ReorderBufferTXN *txn)
580+
{
581+
TestDecodingData *data = ctx->output_plugin_private;
582+
583+
OutputPluginPrepareWrite(ctx, true);
584+
if (data->include_xids)
585+
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
586+
else
587+
appendStringInfo(ctx->out, "opening a streamed block for transaction");
588+
OutputPluginWrite(ctx, true);
589+
}
590+
591+
/*
592+
* We never try to stream any empty xact so we don't need any special handling
593+
* for skip_empty_xacts in streaming mode APIs.
594+
*/
595+
static void
596+
pg_decode_stream_stop(LogicalDecodingContext *ctx,
597+
ReorderBufferTXN *txn)
598+
{
599+
TestDecodingData *data = ctx->output_plugin_private;
600+
601+
OutputPluginPrepareWrite(ctx, true);
602+
if (data->include_xids)
603+
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
604+
else
605+
appendStringInfo(ctx->out, "closing a streamed block for transaction");
606+
OutputPluginWrite(ctx, true);
607+
}
608+
609+
/*
610+
* We never try to stream any empty xact so we don't need any special handling
611+
* for skip_empty_xacts in streaming mode APIs.
612+
*/
613+
static void
614+
pg_decode_stream_abort(LogicalDecodingContext *ctx,
615+
ReorderBufferTXN *txn,
616+
XLogRecPtr abort_lsn)
617+
{
618+
TestDecodingData *data = ctx->output_plugin_private;
619+
620+
OutputPluginPrepareWrite(ctx, true);
621+
if (data->include_xids)
622+
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
623+
else
624+
appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
625+
OutputPluginWrite(ctx, true);
626+
}
627+
628+
/*
629+
* We never try to stream any empty xact so we don't need any special handling
630+
* for skip_empty_xacts in streaming mode APIs.
631+
*/
632+
static void
633+
pg_decode_stream_commit(LogicalDecodingContext *ctx,
634+
ReorderBufferTXN *txn,
635+
XLogRecPtr commit_lsn)
636+
{
637+
TestDecodingData *data = ctx->output_plugin_private;
638+
639+
OutputPluginPrepareWrite(ctx, true);
640+
641+
if (data->include_xids)
642+
appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
643+
else
644+
appendStringInfo(ctx->out, "committing streamed transaction");
645+
646+
if (data->include_timestamp)
647+
appendStringInfo(ctx->out, " (at %s)",
648+
timestamptz_to_str(txn->commit_time));
649+
650+
OutputPluginWrite(ctx, true);
651+
}
652+
653+
/*
654+
* In streaming mode, we don't display the changes as the transaction can abort
655+
* at a later point in time. We don't want users to see the changes until the
656+
* transaction is committed.
657+
*/
658+
static void
659+
pg_decode_stream_change(LogicalDecodingContext *ctx,
660+
ReorderBufferTXN *txn,
661+
Relation relation,
662+
ReorderBufferChange *change)
663+
{
664+
TestDecodingData *data = ctx->output_plugin_private;
665+
666+
OutputPluginPrepareWrite(ctx, true);
667+
if (data->include_xids)
668+
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
669+
else
670+
appendStringInfo(ctx->out, "streaming change for transaction");
671+
OutputPluginWrite(ctx, true);
672+
}
673+
674+
/*
675+
* In streaming mode, we don't display the contents for transactional messages
676+
* as the transaction can abort at a later point in time. We don't want users to
677+
* see the message contents until the transaction is committed.
678+
*/
679+
static void
680+
pg_decode_stream_message(LogicalDecodingContext *ctx,
681+
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
682+
const char *prefix, Size sz, const char *message)
683+
{
684+
OutputPluginPrepareWrite(ctx, true);
685+
686+
if (transactional)
687+
{
688+
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
689+
transactional, prefix, sz);
690+
}
691+
else
692+
{
693+
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
694+
transactional, prefix, sz);
695+
appendBinaryStringInfo(ctx->out, message, sz);
696+
}
697+
698+
OutputPluginWrite(ctx, true);
699+
}
700+
701+
/*
702+
* In streaming mode, we don't display the detailed information of Truncate.
703+
* See pg_decode_stream_change.
704+
*/
705+
static void
706+
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
707+
int nrelations, Relation relations[],
708+
ReorderBufferChange *change)
709+
{
710+
TestDecodingData *data = ctx->output_plugin_private;
711+
712+
OutputPluginPrepareWrite(ctx, true);
713+
if (data->include_xids)
714+
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
715+
else
716+
appendStringInfo(ctx->out, "streaming truncate for transaction");
717+
OutputPluginWrite(ctx, true);
718+
}

0 commit comments

Comments
 (0)