From d3b3ef86b5a91cea404c555d7a029bf2b21fb4f4 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 15 Nov 2016 16:06:16 +0800 Subject: [PATCH 3/3] Add a pg_recvlogical wrapper to PostgresNode --- src/test/perl/PostgresNode.pm | 75 ++++++++++++++++++++++++++++- src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 2f009d4..5197e80 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1124,7 +1124,7 @@ sub psql # IPC::Run::run threw an exception. re-throw unless it's a # timeout, which we'll handle by testing is_expired die $exc_save - if (blessed($exc_save) || $exc_save ne $timeout_exception); + if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/); $ret = undef; @@ -1493,6 +1493,79 @@ sub slot return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns); } +=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...) + +Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which +corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero). + +Disallows pg_recvlogial from internally retrying on error by passing --no-loop. + +Plugin options are passed as additional keyword arguments. + +If called in scalar context, returns stdout, and die()s on timeout or nonzero return. + +If called in array context, returns a tuple of (retval, stdout, stderr, timeout). +timeout is the IPC::Run::Timeout object whose is_expired method can be tested +to check for timeout. retval is undef on timeout. + +=cut + +sub pg_recvlogical_upto +{ + my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_; + my ($stdout, $stderr); + + my $timeout_exception = 'pg_recvlogical timed out'; + + my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname)); + push @cmd, '--endpos', $endpos if ($endpos); + push @cmd, '-f', '-', '--no-loop', '--start'; + + while (my ($k, $v) = each %plugin_options) + { + die "= is not permitted to appear in replication option name" if ($k =~ qr/=/); + push @cmd, "-o", "$k=$v"; + } + + my $timeout; + $timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs; + my $ret = 0; + + do { + local $@; + eval { + IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout); + $ret = $?; + }; + my $exc_save = $@; + if ($exc_save) + { + # IPC::Run::run threw an exception. re-throw unless it's a + # timeout, which we'll handle by testing is_expired + die $exc_save + if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/); + + $ret = undef; + + die "Got timeout exception '$exc_save' but timer not expired?!" + unless $timeout->is_expired; + + die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'" + unless wantarray; + } + }; + + if (wantarray) + { + return ($ret, $stdout, $stderr, $timeout); + } + else + { + die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret; + return $stdout; + } +} + =pod =back diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index b80a9a9..d8cc8d3 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -1,9 +1,13 @@ # Testing of logical decoding using SQL interface and/or pg_recvlogical +# +# Most logical decoding tests are in contrib/test_decoding. This module +# is for work that doesn't fit well there, like where server restarts +# are required. use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 2; +use Test::More tests => 5; # Initialize master node my $node_master = get_new_node('master'); @@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan chomp($result); is($result, '', 'Decoding after fast restart repeats no rows'); +# Insert some rows and verify that we get the same results from pg_recvlogical +# and the SQL interface. +$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]); + +my $expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT}; + +my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]); +is($stdout_sql, $expected, 'got expected output from SQL decoding session'); + +my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"); +diag "waiting to replay $endpos"; + +my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session'); + +$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); + # done with the node $node_master->stop; -- 2.5.5