diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index a4cfe96..2c176cd 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -578,6 +578,7 @@ tqueueShutdownReceiver(DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + empty_queue(tqueue->queue); shm_mq_detach(shm_mq_get_queue(tqueue->queue)); } diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f45a67c..0ad3ddb 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -79,6 +79,24 @@ struct shm_mq char mq_ring[FLEXIBLE_ARRAY_MEMBER]; }; +/* This is the structure for local queue where a worker can write + * tuples when it's shared queue is full. + * + * Each worker has it's own local queue where it can store tuples + * when master is busy and worker's shared queue gets full. Tuples + * are copied into shared queue via single memcpy equal to the space + * available in shared queue. Since, local queue is never shared with + * the master, we do not require any locking mechanism to write tuples + * in it, hence writing in local queue is a cheap operation. + */ +struct local_mq +{ + uint64 mq_bytes_read; + uint64 mq_bytes_written; + Size mq_ring_size; + uint8 mq_ring_offset; + char mq_ring[FLEXIBLE_ARRAY_MEMBER]; +}; /* * This structure is a backend-private handle for access to a queue. * @@ -126,7 +144,9 @@ struct shm_mq */ struct shm_mq_handle { + bool mqh_local; shm_mq *mqh_queue; + local_mq *mqh_local_queue; dsm_segment *mqh_segment; BackgroundWorkerHandle *mqh_handle; char *mqh_buffer; @@ -147,12 +167,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle); +static bool shm_mq_is_detached(volatile shm_mq *mq); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n); static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n); static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); +static uint64 space_in_shm(shm_mq *mq); + +/* Routines required for local queue */ +static local_mq *local_mq_create(void *address, Size size); +static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh); +static uint64 space_in_local(local_mq *lq, Size tuple_size); +static bool read_local_queue(local_mq *lq, bool shm_mq_full); +static shm_mq_result write_in_local_queue(local_mq *mq, shm_mq_iovec *iov); +static void local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written); +static shm_mq_result copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool read_anyway); /* Minimum queue size is enough for header and at least one chunk of data. */ const Size shm_mq_minimum_size = @@ -286,6 +317,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); + mqh->mqh_local = false; + mqh->mqh_local_queue = NULL; mqh->mqh_queue = mq; mqh->mqh_segment = seg; mqh->mqh_buffer = NULL; @@ -315,17 +348,129 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) } /* - * Write a message into a shared message queue. + * Write a message into a shared or local message queue, as per the space + * availability in these queues. If space is available in shared queue then + * we simply write the message there and return. Else we write it in local + * queue. Once both the queues are full, we wait till some of the data in + * shared queue is read and then copy the data from local to shared queue + * and continue writing in local queue. After writing in local queue we + * check if there is space available in shared queue and we copy the data + * from local to shared queue then itself. */ shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) { shm_mq_iovec iov; + local_mq *lq; + shm_mq_result res; + Size tuple_size; + uint64 local_space, shm_space; iov.data = data; iov.len = nbytes; + /* this is actual size for this tuple which will be written in queue */ + tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len); + + /* create and attach a local queue, if it is not yet created */ + if (mqh->mqh_local_queue == NULL) + mqh = local_mq_attach(mqh); + + lq = mqh->mqh_local_queue; + + local_space = space_in_local(lq, tuple_size); + + /* + * if never been to local_queue yet or local_queue is empty, then check + * space in shared queue + */ + if (!mqh->mqh_local || local_space == 0) + shm_space = space_in_shm(mqh->mqh_queue); + + /* + * if there is enough space in shared_queue then write the tuple in it. + */ + if(shm_space > tuple_size && !mqh->mqh_local) + res = shm_mq_sendv(mqh, &iov, 1, nowait); + else + { + /* + * once started with local queue, the tuples will flow from local to + * shared queue untill local queue is empty + */ + mqh->mqh_local = true; + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + local_space = space_in_local(lq, tuple_size); + + /* write in local queue if there is enough space*/ + if (local_space > tuple_size) + { + res = write_in_local_queue(lq, &iov); + + /* check is there is required space in shared queue */ + shm_space = space_in_shm(mqh->mqh_queue); - return shm_mq_sendv(mqh, &iov, 1, nowait); + /* + * if we have some data in local queue and enough space + * is available in shared queue then copy it to shared queue + */ + if (read_local_queue(lq, false) && shm_space > 0) + copy_local_to_shared(lq, mqh, false); + + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + } + else + { + /* + * if local queue is full, then copy some data to shared queue till enough + * space becomes available in local queue + */ + do + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + shm_space = space_in_shm(mqh->mqh_queue); + + /* + * cannot send data to shared queue, unless there is required + * space, so keep on trying till we get some space, since we + * cannot write anymore in local queue as of now + */ + while(shm_space <= 0) + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + shm_space = space_in_shm(mqh->mqh_queue); + } + if (read_local_queue(lq, true) && shm_space > 0) + copy_local_to_shared(lq, mqh, false); + + local_space = space_in_local(lq, tuple_size); + + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + }while (local_space <= tuple_size); + + /* + * once space is available in local queue, write the tuple appropriately. + * Write tuple in local queue unless it has become empty, then write in + * shared queue itself. + */ + if (local_space > 0) + res = write_in_local_queue(lq, &iov); + else + { + mqh->mqh_local = false; + res = shm_mq_sendv(mqh, &iov, 1, nowait); + } + } + } + return res; } /* @@ -1102,7 +1247,18 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, return result; } - +/* + * Get if the shm_mq is deatched. + */ +static bool +shm_mq_is_detached(volatile shm_mq *mq) +{ + bool ret; + SpinLockAcquire(&mq->mq_mutex); + ret = mq->mq_detached; + SpinLockRelease(&mq->mq_mutex); + return ret; +} /* * Get the number of bytes read. The receiver need not use this to access * the count of bytes read, but the sender must. @@ -1195,3 +1351,265 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg) shm_mq_detach(mq); } + +/* Routines required for local queue */ + +/* + * Initialize a new local message queue, this is kept quite similar to shm_mq_create. + */ +static local_mq * +local_mq_create(void *address, Size size) +{ + local_mq *mq = address; + Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); + + /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ + size = MAXALIGN_DOWN(size); + + /* Queue size must be large enough to hold some data. */ + Assert(size > data_offset); + + /* Initialize queue header. */ + mq->mq_bytes_read = 0; + mq->mq_bytes_written = 0; + mq->mq_ring_size = size - data_offset; + mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring); + return mq; +} +/* routine to create and attach local_mq to the shm_mq_handle */ +static shm_mq_handle * +local_mq_attach(shm_mq_handle *mqh) +{ + /* + * create a local queue, the size of this queue should be way higher + * than PARALLEL_TUPLE_QUEUE_SIZE + */ + char *mq; + Size len; + + len = 6553600; + mq = palloc0(len); + mqh->mqh_local_queue = local_mq_create(mq, len); + + return mqh; +} + +/* check the space availability in local queue */ +static uint64 +space_in_local(local_mq *lq, Size tuple_size) +{ + uint64 read, written, used, available, ringsize, writer_offset, reader_offset; + + ringsize = lq->mq_ring_size; + read = lq->mq_bytes_read; + written = lq->mq_bytes_written; + used = written - read; + available = ringsize - used; + + ringsize = lq->mq_ring_size; + writer_offset = lq->mq_bytes_written % ringsize; + reader_offset = lq->mq_bytes_read % ringsize; + + if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset) + available = (ringsize - writer_offset); + + return available; +} +/* routine to check if there is enough space in shared_queue */ +static uint64 +space_in_shm(shm_mq *mq) +{ + uint64 read, written, used, available, ringsize; + bool detached; + + detached = false; + ringsize = mq->mq_ring_size; + read = shm_mq_get_bytes_read(mq, &detached); + written = shm_mq_get_bytes_written(mq, &detached); + + used = written - read; + available = ringsize - used; + + return available; +} + +/* + * Routine to check if reading from local queue is possible. If local + * queue is atleast 5% used or shm_mq is full then we allow reading + * from local queue + */ +static bool +read_local_queue(local_mq *lq, bool shm_mq_full) +{ + uint64 written, read; + + written = lq->mq_bytes_written; + read = lq->mq_bytes_read; + + if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size) + return true; + + else + return true; +} + +/* Routine to write tuple in local queue. */ +static shm_mq_result +write_in_local_queue(local_mq *lq, shm_mq_iovec *iov) +{ + uint64 bytes_written, nbytes, tuple_size; + Size chunksize; + int i; + + tuple_size = sizeof(Size) + iov->len; + nbytes = 0; + bytes_written = 0; + + /* Compute total size of write. */ + for (i = 0; i < 1; ++i) + nbytes += iov[i].len; + + local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written); + + chunksize = iov[0].len; + local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written); + + Assert(bytes_written > 0); + Assert(bytes_written == tuple_size); + return SHM_MQ_SUCCESS; +} + +/* Routine to pass a batch of tuples from local to shared queue in one go */ +static shm_mq_result +copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool nowait) +{ + uint64 to_read, bytes_read, read_offset, available, used; + char *data; + shm_mq_result res; + + bytes_read = 0; + + used = lq->mq_bytes_written - lq->mq_bytes_read; + Assert(used <= lq->mq_ring_size); + Assert(lq->mq_bytes_read <= lq->mq_bytes_written); + read_offset = lq->mq_bytes_read % lq->mq_ring_size; + available = space_in_shm(mqh->mqh_queue); + + /* always read data in the aligned form */ + to_read = MAXALIGN_DOWN(Min(used, available)); + + /* + * if the amount of data to be send from local queue involves wrapping of + * local queue, then send only the data till the end of queue right now + * and rest later. + */ + if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size) + to_read = lq->mq_ring_size - (lq->mq_bytes_read % lq->mq_ring_size); + + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + Assert(to_read > 0); + + data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]); + + res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read); + + if(res == SHM_MQ_DETACHED) + return SHM_MQ_DETACHED; + + if (res != SHM_MQ_SUCCESS) + return res; + + Assert(bytes_read == to_read); + lq->mq_bytes_read += bytes_read; + shm_mq_notify_receiver(mqh->mqh_queue); + + return res; +} +/* + * This is the function which actually writes the tuple in the local_queue, + * it is same as shm_mq_send_bytes is for shm_mq. + */ +static void +local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written) +{ + uint64 used; + Size ringsize = mq->mq_ring_size; + Size available, sent = 0, sendnow; + + uint64 rb; + + while(sent < nbytes) + { + /* Compute number of ring buffer bytes used and available. */ + rb = mq->mq_bytes_read; + Assert (mq->mq_bytes_written >= rb); + used = mq->mq_bytes_written - rb; + Assert(used <= ringsize); + available = Min(ringsize - used, nbytes - sent); + + if(available == 0) + elog(ERROR,"local queue full, this should never be reached"); + + else + { + Size offset = mq->mq_bytes_written % (uint64) ringsize; + sendnow = Min(available, ringsize - offset); + + /* Write as much data as we can via a single memcpy(). */ + memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow); + sent += sendnow; + /* + * Update count of bytes written, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a run of bytes, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read is as well. + */ + Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); + mq->mq_bytes_written += MAXALIGN(sendnow); + } + } + *bytes_written += sent; +} +/* + * Empty the local queue by copying all the data from local to shared queue. + * This is required before shutdown of worker. + */ +void +empty_queue(shm_mq_handle *mqh) +{ + shm_mq_result res; + local_mq *lq; + uint64 shm_space; + + lq = mqh->mqh_local_queue; + + if (lq == NULL || lq->mq_bytes_written == 0 || shm_mq_is_detached(mqh->mqh_queue)) + return; + + while(lq->mq_bytes_written > lq->mq_bytes_read) + { + shm_space = space_in_shm(mqh->mqh_queue); + /* + * cannot send data to shared queue, unless there is required + * space, so keep on trying till we get some space + */ + while(shm_space <= 0) + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return; + + shm_space = space_in_shm(mqh->mqh_queue); + } + + if (read_local_queue(lq, true)) + res = copy_local_to_shared(lq, mqh, false); + + if (res == SHM_MQ_DETACHED) + return; + } + /* this local queue is not required anymore, hence free the space. */ + pfree(mqh->mqh_local_queue); + return; +} diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 02a93e0..6ba3cb3 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -37,7 +37,7 @@ typedef enum { SHM_MQ_SUCCESS, /* Sent or received a message. */ SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */ - SHM_MQ_DETACHED /* Other process has detached queue. */ + SHM_MQ_DETACHED /* Other process has detached queue. */ } shm_mq_result; /* @@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); /* Smallest possible queue. */ extern PGDLLIMPORT const Size shm_mq_minimum_size; +/* Routines and structures required for local and shared queue type architecture */ +extern void empty_queue(shm_mq_handle *mqh); +struct local_mq; +typedef struct local_mq local_mq; #endif /* SHM_MQ_H */