From 15d57fc08ff343394b7f0c2a1251d5ec12a518ed Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sat, 2 May 2020 11:41:59 +0530 Subject: [PATCH v21 11/12] Provide new api to get the streaming changes --- .gitignore | 1 + src/backend/catalog/system_views.sql | 8 +++++++ .../replication/logical/logicalfuncs.c | 23 +++++++++++++++---- src/include/catalog/pg_proc.dat | 9 ++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 794e35b73c..6083744c07 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ lib*.pc /Debug/ /Release/ /tmp_install/ +/build/ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8f34ce8deb..dd488cb2f8 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1242,6 +1242,14 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_get_changes'; +CREATE OR REPLACE FUNCTION pg_logical_slot_get_streaming_changes( + IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', + OUT lsn pg_lsn, OUT xid xid, OUT data text) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 +AS 'pg_logical_slot_get_streaming_changes'; + CREATE OR REPLACE FUNCTION pg_logical_slot_peek_changes( IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', OUT lsn pg_lsn, OUT xid xid, OUT data text) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index b99c94e848..70c28ffa91 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -108,7 +108,8 @@ check_permissions(void) * Helper function for the various SQL callable logical decoding functions. */ static Datum -pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) +pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, + bool binary, bool streaming) { Name name; XLogRecPtr upto_lsn; @@ -252,6 +253,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(*name)), errdetail("This slot has never previously reserved WAL, or has been invalidated."))); + /* If called has not asked for streaming changes then disable it. */ + ctx->streaming &= streaming; + MemoryContextSwitchTo(oldcontext); /* @@ -362,7 +366,16 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, false); + return pg_logical_slot_get_changes_guts(fcinfo, true, false, false); +} + +/* + * SQL function to get the streaming changes as text, consuming the data. + */ +Datum +pg_logical_slot_get_streaming_changes(PG_FUNCTION_ARGS) +{ + return pg_logical_slot_get_changes_guts(fcinfo, true, false, true); } /* @@ -371,7 +384,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, false); + return pg_logical_slot_get_changes_guts(fcinfo, false, false, false); } /* @@ -380,7 +393,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, true); + return pg_logical_slot_get_changes_guts(fcinfo, true, true, false); } /* @@ -389,7 +402,7 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, true); + return pg_logical_slot_get_changes_guts(fcinfo, false, true, false); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 5a8826cc67..586c9621e2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10115,6 +10115,15 @@ proargmodes => '{i,i,i,v,o,o,o}', proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', prosrc => 'pg_logical_slot_get_binary_changes' }, +{ oid => '6150', descr => 'get streaming changes from replication slot', + proname => 'pg_logical_slot_get_streaming_changes', procost => '1000', + prorows => '1000', provariadic => 'text', proisstrict => 'f', + proretset => 't', provolatile => 'v', proparallel => 'u', + prorettype => 'record', proargtypes => 'name pg_lsn int4 _text', + proallargtypes => '{name,pg_lsn,int4,_text,pg_lsn,xid,text}', + proargmodes => '{i,i,i,v,o,o,o}', + proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', + prosrc => 'pg_logical_slot_get_streaming_changes' }, { oid => '3784', descr => 'peek at changes from replication slot', proname => 'pg_logical_slot_peek_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', -- 2.23.0