From b433eb1312b86bd15ce5a0727db7d571c20b3b4f Mon Sep 17 00:00:00 2001 From: dilipkumar Date: Thu, 10 Sep 2020 11:39:27 +0530 Subject: [PATCH v1] Skip printing empty stream in test decoding --- contrib/test_decoding/test_decoding.c | 34 +++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 34745150e9..39e29e13d6 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_stream_start(LogicalDecodingContext *ctx, + TestDecodingData *data, + ReorderBufferTXN *txn, + bool last_write); static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pg_decode_stream_abort(LogicalDecodingContext *ctx, @@ -593,6 +597,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + data->xact_wrote_changes = false; + if (data->skip_empty_xacts) + return; + pg_output_stream_start(ctx, data, txn, true); +} + +static void +pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); @@ -611,6 +624,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); @@ -630,6 +646,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); @@ -649,6 +668,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -676,6 +698,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + /* output stream start if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -722,6 +751,11 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); -- 2.23.0