diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index e9a5d5a1a5..b4f8386899 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -90,7 +90,10 @@ tqueueShutdownReceiver(DestReceiver *self) TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; if (tqueue->queue != NULL) + { + empty_queue(tqueue->queue); shm_mq_detach(tqueue->queue); + } tqueue->queue = NULL; } diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 770559a03e..75273ba51c 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -25,6 +25,8 @@ #include "storage/shm_mq.h" #include "storage/spin.h" +#define LOCAL_TUPLE_QUEUE_SIZE 6553600 + /* * This structure represents the actual queue, stored in shared memory. * @@ -79,6 +81,25 @@ struct shm_mq char mq_ring[FLEXIBLE_ARRAY_MEMBER]; }; +/* This is the structure for local queue where a worker can write + * tuples when it's shared queue is full. + * + * Each worker has it's own local queue where it can store tuples + * when master is busy and worker's shared queue gets full. Tuples + * are copied into shared queue via single memcpy equal to the space + * available in shared queue. Since, local queue is never shared with + * the master, we do not require any locking mechanism to write tuples + * in it, hence writing in local queue is a cheap operation. + */ +struct local_mq +{ + uint64 mq_bytes_read; + uint64 mq_bytes_written; + Size mq_ring_size; + uint8 mq_ring_offset; + char mq_ring[FLEXIBLE_ARRAY_MEMBER]; +}; + /* * This structure is a backend-private handle for access to a queue. * @@ -128,7 +149,9 @@ struct shm_mq */ struct shm_mq_handle { + bool mqh_local; shm_mq *mqh_queue; + local_mq *mqh_local_queue; dsm_segment *mqh_segment; BackgroundWorkerHandle *mqh_handle; char *mqh_buffer; @@ -150,12 +173,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, BackgroundWorkerHandle *handle); +static bool shm_mq_is_detached(volatile shm_mq *mq); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n); static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n); static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); +static uint64 space_in_shm(shm_mq *mq); + +/* Routines required for local queue */ +static local_mq * local_mq_create(void *address, Size size); +static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh); +static uint64 space_in_local(local_mq * lq, Size tuple_size); +static bool read_local_queue(local_mq * lq); +static shm_mq_result write_in_local_queue(local_mq * mq, shm_mq_iovec *iov); +static void local_mq_send_bytes(local_mq * mq, Size nbytes, const void *data, Size *bytes_written); +static shm_mq_result copy_local_to_shared(local_mq * lq, shm_mq_handle *mqh, bool read_anyway); /* Minimum queue size is enough for header and at least one chunk of data. */ const Size shm_mq_minimum_size = @@ -289,6 +323,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); + mqh->mqh_local = false; + mqh->mqh_local_queue = NULL; mqh->mqh_queue = mq; mqh->mqh_segment = seg; mqh->mqh_handle = handle; @@ -319,17 +355,117 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) } /* - * Write a message into a shared message queue. + * Write a message into a shared or local message queue, as per the space + * availability in these queues. If space is available in shared queue then + * we simply write the message there and return. Else we write it in local + * queue. Once both the queues are full, we wait till some of the data in + * shared queue is read and then copy the data from local to shared queue + * and continue writing in local queue. After writing in local queue we + * check if there is space available in shared queue and we copy the data + * from local to shared queue then itself. */ shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) { shm_mq_iovec iov; + local_mq *lq; + shm_mq_result res; + Size tuple_size; + uint64 local_space; iov.data = data; iov.len = nbytes; + /* this is actual size for this tuple which will be written in queue */ + tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len); + + /* + * if there is enough space in shared_queue and never been to local queue + * then write the tuple in shared queue. + */ + if (space_in_shm(mqh->mqh_queue) > tuple_size && !mqh->mqh_local) + res = shm_mq_sendv(mqh, &iov, 1, nowait); + + else + { + /* if queue is detached for some reason, nothing to do */ + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + /* + * once started with local queue, the tuples will flow from local to + * shared queue untill local queue is empty + */ + mqh->mqh_local = true; + + /* create and attach a local queue, if it is not yet created */ + if (mqh->mqh_local_queue == NULL) + mqh = local_mq_attach(mqh); - return shm_mq_sendv(mqh, &iov, 1, nowait); + lq = mqh->mqh_local_queue; + local_space = space_in_local(lq, tuple_size); + + /* write in local queue if there is enough space */ + if (local_space >= tuple_size) + { + res = write_in_local_queue(lq, &iov); + + /* + * if we have some data in local queue and some space in shared + * queue then copy it to shared queue + */ + if (read_local_queue(lq) && space_in_shm(mqh->mqh_queue) > 0) + copy_local_to_shared(lq, mqh, false); + } + else + { + /* + * if local queue is full, then copy some data to shared queue + * till enough space becomes available in local queue + */ + do + { + while (space_in_shm(mqh->mqh_queue) < tuple_size) + { + /* + * cannot send data to shared queue, unless there is + * required space, so wait till we get some space, since + * we cannot write anymore in local queue as of now + */ + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* if queue is detached then nothing to do */ + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + } + if (read_local_queue(lq)) + copy_local_to_shared(lq, mqh, false); + + local_space = space_in_local(lq, tuple_size); + + } while (local_space <= tuple_size); + + /* + * once space is available in local queue, write the tuple + * appropriately. If local queue has become empty, then write the + * tuple in shared queue itself, otherwise continue with local + * queue itself. + */ + if (local_space > 0) + res = write_in_local_queue(lq, &iov); + else + { + mqh->mqh_local = false; + res = shm_mq_sendv(mqh, &iov, 1, nowait); + } + } + } + return res; } /* @@ -1133,6 +1269,20 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, } /* + * Get if the shm_mq is deatched. + */ +static bool +shm_mq_is_detached(volatile shm_mq *mq) +{ + bool ret; + + SpinLockAcquire(&mq->mq_mutex); + ret = mq->mq_detached; + SpinLockRelease(&mq->mq_mutex); + return ret; +} + +/* * Get the number of bytes read. The receiver need not use this to access * the count of bytes read, but the sender must. */ @@ -1224,3 +1374,268 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg) shm_mq_detach_internal(mq); } + +/* Routines required for local queue */ + +/* + * Initialize a new local message queue, this is kept quite similar to shm_mq_create. + */ +static local_mq * +local_mq_create(void *address, Size size) +{ + local_mq *mq = address; + Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); + + /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ + size = MAXALIGN_DOWN(size); + + /* Queue size must be large enough to hold some data. */ + Assert(size > data_offset); + + /* Initialize queue header. */ + mq->mq_bytes_read = 0; + mq->mq_bytes_written = 0; + mq->mq_ring_size = size - data_offset; + mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring); + return mq; +} + +/* routine to create and attach local_mq to the shm_mq_handle */ +static shm_mq_handle * +local_mq_attach(shm_mq_handle *mqh) +{ + /* + * create a local queue, the size of this queue should be way higher than + * PARALLEL_TUPLE_QUEUE_SIZE + */ + char *mq; + Size len; + + len = LOCAL_TUPLE_QUEUE_SIZE; + mq = palloc0(len); + mqh->mqh_local_queue = local_mq_create(mq, len); + + return mqh; +} + +/* check the space availability in local queue */ +static uint64 +space_in_local(local_mq * lq, Size tuple_size) +{ + uint64 read, + written, + used, + available, + ringsize, + writer_offset, + reader_offset; + + ringsize = lq->mq_ring_size; + read = lq->mq_bytes_read; + written = lq->mq_bytes_written; + used = written - read; + available = ringsize - used; + + ringsize = lq->mq_ring_size; + writer_offset = lq->mq_bytes_written % ringsize; + reader_offset = lq->mq_bytes_read % ringsize; + + if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset) + available = (ringsize - writer_offset); + + return available; +} + +/* routine to check if there is enough space in shared_queue */ +static uint64 +space_in_shm(shm_mq *mq) +{ + uint64 read, + written, + used, + available, + ringsize; + bool detached = false; + + ringsize = mq->mq_ring_size; + read = shm_mq_get_bytes_read(mq, &detached); + written = shm_mq_get_bytes_written(mq, &detached); + + used = written - read; + available = ringsize - used; + + return available; +} + +/* + * Routine to check if reading from local queue is possible. If local + * queue is atleast 5% used then we allow reading from local queue + */ +static bool +read_local_queue(local_mq * lq) +{ + uint64 written, + read; + + written = lq->mq_bytes_written; + read = lq->mq_bytes_read; + + if ((written - read) >= .05 * lq->mq_ring_size) + return true; + else + return false; +} + +/* Routine to write tuple in local queue. */ +static shm_mq_result +write_in_local_queue(local_mq * lq, shm_mq_iovec *iov) +{ + uint64 bytes_written, + nbytes, + tuple_size; + Size chunksize; + int i; + + tuple_size = sizeof(Size) + iov->len; + nbytes = 0; + bytes_written = 0; + + /* Compute total size of write. */ + for (i = 0; i < 1; ++i) + nbytes += iov[i].len; + + local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written); + + chunksize = iov[0].len; + local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written); + + Assert(bytes_written > 0); + Assert(bytes_written == tuple_size); + return SHM_MQ_SUCCESS; +} + +/* Routine to pass a batch of tuples from local to shared queue in one go */ +static shm_mq_result +copy_local_to_shared(local_mq * lq, shm_mq_handle *mqh, bool nowait) +{ + uint64 to_read, + bytes_read, + read_offset, + available, + used; + char *data; + shm_mq_result res; + + bytes_read = 0; + + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + used = lq->mq_bytes_written - lq->mq_bytes_read; + Assert(used <= lq->mq_ring_size); + Assert(lq->mq_bytes_read <= lq->mq_bytes_written); + read_offset = lq->mq_bytes_read % lq->mq_ring_size; + available = space_in_shm(mqh->mqh_queue); + + /* always read data in aligned form */ + to_read = MAXALIGN_DOWN(Min(used, available)); + + /* + * if the amount of data to be send from local queue involves wrapping of + * local queue, then send only the data till the end of queue right now + * and rest later. + */ + if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size) + to_read = lq->mq_ring_size - (lq->mq_bytes_read % lq->mq_ring_size); + + data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]); + res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read); + + if (res != SHM_MQ_SUCCESS) + return res; + + Assert(bytes_read == to_read); + lq->mq_bytes_read += bytes_read; + shm_mq_notify_receiver(mqh->mqh_queue); + + return res; +} + +/* + * This is the function which actually writes the tuple in the local_queue, + * it is same as shm_mq_send_bytes is for shm_mq. + */ +static void +local_mq_send_bytes(local_mq * mq, Size nbytes, const void *data, Size *bytes_written) +{ + uint64 used; + Size ringsize = mq->mq_ring_size; + Size available, + sent = 0, + sendnow; + + uint64 rb; + + while (sent < nbytes) + { + /* Compute number of ring buffer bytes used and available. */ + rb = mq->mq_bytes_read; + Assert(mq->mq_bytes_written >= rb); + used = mq->mq_bytes_written - rb; + Assert(used <= ringsize); + available = Min(ringsize - used, nbytes - sent); + + if (available == 0) + elog(ERROR, "local queue full, this should never be reached"); + + else + { + Size offset = mq->mq_bytes_written % (uint64) ringsize; + + sendnow = Min(available, ringsize - offset); + + /* Write as much data as we can via a single memcpy(). */ + memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow); + sent += sendnow; + + /* + * Update count of bytes written, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a run of bytes, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read is as well. + */ + Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); + mq->mq_bytes_written += MAXALIGN(sendnow); + } + } + *bytes_written += sent; +} + +/* + * Empty the local queue by copying all the data from local to shared queue. + * This is required before shutdown of worker. + */ +void +empty_queue(shm_mq_handle *mqh) +{ + local_mq *lq; + uint64 read, + written; + + lq = mqh->mqh_local_queue; + + if (lq == NULL || lq->mq_bytes_written == 0) + return; + + read = lq->mq_bytes_read; + written = lq->mq_bytes_written; + + while (written > read && !shm_mq_is_detached(mqh->mqh_queue)) + { + copy_local_to_shared(lq, mqh, false); + read = lq->mq_bytes_read; + written = lq->mq_bytes_written; + } + /* this local queue is not required anymore, hence free the space. */ + pfree(mqh->mqh_local_queue); +} diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 7709efcc48..80afb435e0 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); /* Smallest possible queue. */ extern PGDLLIMPORT const Size shm_mq_minimum_size; +/* Routines and structures required for local and shared queue type architecture */ +extern void empty_queue(shm_mq_handle *mqh); +struct local_mq; +typedef struct local_mq local_mq; #endif /* SHM_MQ_H */