From 05bc4ad08edde37125199e0648f6d3baf5988ad7 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sat, 2 May 2020 11:41:59 +0530 Subject: [PATCH v20 11/12] Provide new api to get the streaming changes --- .gitignore | 1 + src/backend/catalog/system_views.sql | 8 +++++++ .../replication/logical/logicalfuncs.c | 23 +++++++++++++++---- src/include/catalog/pg_proc.dat | 9 ++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 794e35b73c..6083744c07 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ lib*.pc /Debug/ /Release/ /tmp_install/ +/build/ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8f34ce8deb..dd488cb2f8 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1242,6 +1242,14 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_get_changes'; +CREATE OR REPLACE FUNCTION pg_logical_slot_get_streaming_changes( + IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', + OUT lsn pg_lsn, OUT xid xid, OUT data text) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 +AS 'pg_logical_slot_get_streaming_changes'; + CREATE OR REPLACE FUNCTION pg_logical_slot_peek_changes( IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', OUT lsn pg_lsn, OUT xid xid, OUT data text) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fded8e8290..debb91b457 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -108,7 +108,8 @@ check_permissions(void) * Helper function for the various SQL callable logical decoding functions. */ static Datum -pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) +pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, + bool binary, bool streaming) { Name name; XLogRecPtr upto_lsn; @@ -250,6 +251,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(*name)), errdetail("This slot has never previously reserved WAL, or has been invalidated."))); + /* If called has not asked for streaming changes then disable it. */ + ctx->streaming &= streaming; + MemoryContextSwitchTo(oldcontext); /* @@ -360,7 +364,16 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, false); + return pg_logical_slot_get_changes_guts(fcinfo, true, false, false); +} + +/* + * SQL function to get the streaming changes as text, consuming the data. + */ +Datum +pg_logical_slot_get_streaming_changes(PG_FUNCTION_ARGS) +{ + return pg_logical_slot_get_changes_guts(fcinfo, true, false, true); } /* @@ -369,7 +382,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, false); + return pg_logical_slot_get_changes_guts(fcinfo, false, false, false); } /* @@ -378,7 +391,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, true); + return pg_logical_slot_get_changes_guts(fcinfo, true, true, false); } /* @@ -387,7 +400,7 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, true); + return pg_logical_slot_get_changes_guts(fcinfo, false, true, false); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9fb1ffe2c8..3dfc5c10fc 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10117,6 +10117,15 @@ proargmodes => '{i,i,i,v,o,o,o}', proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', prosrc => 'pg_logical_slot_get_binary_changes' }, +{ oid => '6150', descr => 'get streaming changes from replication slot', + proname => 'pg_logical_slot_get_streaming_changes', procost => '1000', + prorows => '1000', provariadic => 'text', proisstrict => 'f', + proretset => 't', provolatile => 'v', proparallel => 'u', + prorettype => 'record', proargtypes => 'name pg_lsn int4 _text', + proallargtypes => '{name,pg_lsn,int4,_text,pg_lsn,xid,text}', + proargmodes => '{i,i,i,v,o,o,o}', + proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', + prosrc => 'pg_logical_slot_get_streaming_changes' }, { oid => '3784', descr => 'peek at changes from replication slot', proname => 'pg_logical_slot_peek_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', -- 2.23.0