From f8ed06cfee691d9ba9602a93113b9a4debe97b5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= Date: Mon, 8 Jun 2020 02:00:53 +0200 Subject: [PATCH 1/2] Initial work on COPY progress. --- src/backend/commands/copy.c | 13 ++++++++++--- src/backend/utils/adt/pgstatfuncs.c | 2 ++ src/include/commands/progress.h | 3 +++ src/include/pgstat.h | 3 ++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc463c18..6c66a1631a40 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -29,6 +29,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "commands/progress.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -45,6 +46,7 @@ #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -1752,6 +1754,9 @@ BeginCopy(ParseState *pstate, cstate->copy_dest = COPY_FILE; /* default */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, queryRelId); + pgstat_progress_update_param(PROGRESS_COPY_PROCESSED,0); + MemoryContextSwitchTo(oldcontext); return cstate; @@ -1811,6 +1816,8 @@ EndCopy(CopyState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } @@ -2123,7 +2130,7 @@ CopyTo(CopyState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - processed++; + pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -3262,7 +3269,7 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed); } } @@ -5119,7 +5126,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - myState->processed++; + pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++myState->processed); return true; } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2aff739466ff..b740eef7c102 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) cmdtype = PROGRESS_COMMAND_CREATE_INDEX; else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0) cmdtype = PROGRESS_COMMAND_BASEBACKUP; + else if (pg_strcasecmp(cmd, "COPY") == 0) + cmdtype = PROGRESS_COMMAND_COPY; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 36b073e67757..2d72f12a75b5 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,4 +133,7 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 +/* Commands of PROGRESS_CLUSTER */ +#define PROGRESS_COPY_PROCESSED 0 + #endif diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c55dc1481ca5..45b8006ff2b8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -994,7 +994,8 @@ typedef enum ProgressCommandType PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CREATE_INDEX, - PROGRESS_COMMAND_BASEBACKUP + PROGRESS_COMMAND_BASEBACKUP, + PROGRESS_COMMAND_COPY } ProgressCommandType; #define PGSTAT_NUM_PROGRESS_PARAM 20 From bb528a21dcf8d12b06961d8fd591d3ac3bbf2c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= Date: Sun, 14 Jun 2020 02:46:04 +0200 Subject: [PATCH 2/2] Enhance copy progress with more info. - add pg_stat_progress_copy system view --- src/backend/catalog/system_views.sql | 14 ++++++++++++++ src/backend/commands/copy.c | 16 +++++++++++----- src/include/commands/progress.h | 8 ++++++-- src/test/regress/expected/rules.out | 15 +++++++++++++++ 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 56420bbc9d6f..05f995b43937 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1093,6 +1093,20 @@ CREATE VIEW pg_stat_progress_basebackup AS S.param5 AS tablespaces_streamed FROM pg_stat_get_progress_info('BASEBACKUP') AS S; +CREATE VIEW pg_stat_progress_copy AS + SELECT + S.pid AS pid, S.datid AS datid, D.datname AS datname, + S.relid AS relid, + CASE S.param1 WHEN 0 THEN 'TO' + WHEN 1 THEN 'FROM' + END as direction, + CAST (S.param2::integer AS bool) AS file, + CAST (S.param3::integer AS bool) AS program, + S.param4 AS lines_processed, + S.param5 AS file_bytes_processed + FROM pg_stat_get_progress_info('COPY') AS S + LEFT JOIN pg_database D ON S.datid = D.oid; + CREATE VIEW pg_user_mappings AS SELECT U.oid AS umid, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6c66a1631a40..3462e4f414d1 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -566,6 +566,7 @@ CopySendEndOfRow(CopyState cstate) (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); } + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, ftell(cstate->copy_file)); break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ @@ -618,6 +619,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { case COPY_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, ftell(cstate->copy_file)); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), @@ -1754,8 +1756,12 @@ BeginCopy(ParseState *pstate, cstate->copy_dest = COPY_FILE; /* default */ - pgstat_progress_start_command(PROGRESS_COMMAND_COPY, queryRelId); - pgstat_progress_update_param(PROGRESS_COPY_PROCESSED,0); + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, 0); + pgstat_progress_update_param(PROGRESS_COPY_IS_FROM, (int) cstate->is_copy_from); + pgstat_progress_update_param(PROGRESS_COPY_IS_FILE, (int) cstate->copy_dest == COPY_FILE); + pgstat_progress_update_param(PROGRESS_COPY_IS_PROGRAM, (int) cstate->is_program); MemoryContextSwitchTo(oldcontext); @@ -2130,7 +2136,7 @@ CopyTo(CopyState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed); + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -3269,7 +3275,7 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed); + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } } @@ -5126,7 +5132,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++myState->processed); + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); return true; } diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 2d72f12a75b5..3947222e6f61 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,7 +133,11 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 -/* Commands of PROGRESS_CLUSTER */ -#define PROGRESS_COPY_PROCESSED 0 +/* Commands of PROGRESS_COPY */ +#define PROGRESS_COPY_IS_FROM 0 +#define PROGRESS_COPY_IS_FILE 1 +#define PROGRESS_COPY_IS_PROGRAM 2 +#define PROGRESS_COPY_LINES_PROCESSED 3 +#define PROGRESS_COPY_BYTES_PROCESSED 4 #endif diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index b813e322153d..dac5da4e6dd9 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1925,6 +1925,21 @@ pg_stat_progress_cluster| SELECT s.pid, s.param8 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); +pg_stat_progress_copy| SELECT s.pid, + s.datid, + d.datname, + s.relid, + CASE s.param1 + WHEN 0 THEN 'TO'::text + WHEN 1 THEN 'FROM'::text + ELSE NULL::text + END AS direction, + ((s.param2)::integer)::boolean AS file, + ((s.param3)::integer)::boolean AS program, + s.param4 AS lines_processed, + s.param5 AS file_bytes_processed + FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, s.datid, d.datname,