From 28c5b37c2271b623f6bc4653d17f92dedb8722be Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 22 Sep 2020 15:12:27 +0530 Subject: [PATCH v2] Parallel Copy Exec Time Capture A testing patch for capturing various timings such as total copy time in leader and worker, index insertion time, leader and worker waiting time. --- src/backend/commands/copy.c | 74 ++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5b1884acd8..cb72949e0e 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -65,6 +65,14 @@ #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') +/* Global variables for capturing parallel copy execution times. */ +double totalcopytime; +double totalcopytimeworker; +double totalcopyleaderwaitingtime; +double totalcopyworkerwaitingtime; +double totaltableinsertiontime; +double totalindexinsertiontime; + /* * Represents the different source/dest cases we need to worry about at * the bottom level @@ -1332,9 +1340,16 @@ CacheLineInfo(CopyState cstate, uint32 buff_count) uint32 offset; int dataSize; int copiedSize = 0; + struct timespec before, after; + struct timespec before1, after1; resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); + INSTR_TIME_SET_CURRENT(before); write_pos = GetLinePosition(cstate); + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totalcopyworkerwaitingtime += INSTR_TIME_GET_MILLISEC(after); + if (-1 == write_pos) return true; @@ -1436,6 +1451,7 @@ CacheLineInfo(CopyState cstate, uint32 buff_count) data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; } + INSTR_TIME_SET_CURRENT(before1); for (;;) { /* Get the size of this line */ @@ -1455,6 +1471,9 @@ CacheLineInfo(CopyState cstate, uint32 buff_count) COPY_WAIT_TO_PROCESS() } + INSTR_TIME_SET_CURRENT(after1); + INSTR_TIME_SUBTRACT(after1, before1); + totalcopyworkerwaitingtime += INSTR_TIME_GET_MILLISEC(after1); } empty_data_line_update: @@ -1538,6 +1557,11 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) char *convertListStr = NULL; WalUsage *walusage; BufferUsage *bufferusage; + struct timespec before, after; + totalcopytimeworker = 0; + totalcopyworkerwaitingtime = 0; + totaltableinsertiontime = 0; + totalindexinsertiontime = 0; /* Allocate workspace and zero all fields. */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); @@ -1606,7 +1630,15 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) cstate->rel = rel; InitializeParallelCopyInfo(shared_cstate, cstate, attlist); + INSTR_TIME_SET_CURRENT(before); CopyFrom(cstate); + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totalcopytimeworker += INSTR_TIME_GET_MILLISEC(after); + ereport(LOG, (errmsg("totalcopyworkerwaitingtime = %.3f ms", totalcopyworkerwaitingtime), errhidestmt(true))); + ereport(LOG, (errmsg("totaltableinsertiontime = %.3f ms", totaltableinsertiontime), errhidestmt(true))); + ereport(LOG, (errmsg("totalindexinsertiontime = %.3f ms", totalindexinsertiontime), errhidestmt(true))); + ereport(LOG, (errmsg("totalcopytimeworker = %.3f ms", totalcopytimeworker), errhidestmt(true))); if (rel != NULL) table_close(rel, RowExclusiveLock); @@ -1633,11 +1665,16 @@ UpdateBlockInLineInfo(CopyState cstate, uint32 blk_pos, ParallelCopyLineBoundaries *lineBoundaryPtr = &pcshared_info->line_boundaries; ParallelCopyLineBoundary *lineInfo; int line_pos = lineBoundaryPtr->pos; + struct timespec before, after; /* Update the line information for the worker to pick and process. */ lineInfo = &lineBoundaryPtr->ring[line_pos]; + INSTR_TIME_SET_CURRENT(before); while (pg_atomic_read_u32(&lineInfo->line_size) != -1) COPY_WAIT_TO_PROCESS() + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totalcopyleaderwaitingtime += INSTR_TIME_GET_MILLISEC(after); lineInfo->first_block = blk_pos; lineInfo->start_offset = offset; @@ -2203,6 +2240,8 @@ static uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) { uint32 new_free_pos = -1; + struct timespec before, after; + INSTR_TIME_SET_CURRENT(before); for (;;) { new_free_pos = GetFreeCopyBlock(pcshared_info); @@ -2211,7 +2250,9 @@ WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) COPY_WAIT_TO_PROCESS() } - + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totalcopyleaderwaitingtime += INSTR_TIME_GET_MILLISEC(after); return new_free_pos; } @@ -3083,12 +3124,21 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { ParallelContext *pcxt = NULL; + struct timespec before, after; Assert(rel); + totalcopytime = 0; + totalcopytimeworker = 0; + totalcopyleaderwaitingtime = 0; + totalcopyworkerwaitingtime = 0; + totaltableinsertiontime = 0; + totalindexinsertiontime = 0; /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); + INSTR_TIME_SET_CURRENT(before); + cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; @@ -3119,6 +3169,18 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, } EndCopyFrom(cstate); + + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totalcopytime += INSTR_TIME_GET_MILLISEC(after); + if (pcxt != NULL) + ereport(LOG, (errmsg("totalcopyleaderwaitingtime = %.3f ms", totalcopyleaderwaitingtime), errhidestmt(true))); + if (pcxt == NULL) + { + ereport(LOG, (errmsg("totaltableinsertiontime = %.3f ms", totaltableinsertiontime), errhidestmt(true))); + ereport(LOG, (errmsg("totalindexinsertiontime = %.3f ms", totalindexinsertiontime), errhidestmt(true))); + } + ereport(LOG, (errmsg("totalcopytime = %.3f ms", totalcopytime), errhidestmt(true))); } else { @@ -4527,6 +4589,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int nused = buffer->nused; ResultRelInfo *resultRelInfo = buffer->resultRelInfo; TupleTableSlot **slots = buffer->slots; + struct timespec before, after; + struct timespec before1, after1; /* Set es_result_relation_info to the ResultRelInfo we're flushing. */ estate->es_result_relation_info = resultRelInfo; @@ -4543,14 +4607,19 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, * context before calling it. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + INSTR_TIME_SET_CURRENT(before); table_multi_insert(resultRelInfo->ri_RelationDesc, slots, nused, mycid, ti_options, buffer->bistate); + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + totaltableinsertiontime += INSTR_TIME_GET_MILLISEC(after); MemoryContextSwitchTo(oldcontext); + INSTR_TIME_SET_CURRENT(before1); for (i = 0; i < nused; i++) { /* @@ -4586,6 +4655,9 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, ExecClearTuple(slots[i]); } + INSTR_TIME_SET_CURRENT(after1); + INSTR_TIME_SUBTRACT(after1, before1); + totalindexinsertiontime += INSTR_TIME_GET_MILLISEC(after1); /* Mark that all slots are free */ buffer->nused = 0; -- 2.25.1