diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 8d7e711b3b..e3405b255d 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -59,6 +59,7 @@ */ #define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */ #define TUPLE_QUEUE_MODE_DATA 'd' +#define LOCAL_TUPLE_QUEUE_SIZE 32768 /* * Both the sender and receiver build trees of TupleRemapInfo nodes to help @@ -145,6 +146,10 @@ typedef struct TQueueDestReceiver char mode; /* current message mode */ TupleDesc tupledesc; /* current top-level tuple descriptor */ TupleRemapInfo **field_remapinfo; /* current top-level remap info */ + char *iovec; + int length; + int count; + } TQueueDestReceiver; /* @@ -213,6 +218,7 @@ static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext); static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext); +static void empty_tqueue(TQueueDestReceiver *tqueue); /* @@ -304,10 +310,53 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) } } - /* Send the tuple itself. */ + /* Store tuples in the local queue. */ tuple = ExecMaterializeSlot(slot); - result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); - + if(TupIsNull(slot)) + empty_tqueue(tqueue); + else + { + if (tqueue->length + tuple->t_len < LOCAL_TUPLE_QUEUE_SIZE) + { + shm_mq_iovec *iov = tqueue->iovec + tqueue->length; + iov->len = tuple->t_len; + tqueue->length += sizeof (shm_mq_iovec); + iov->data = tqueue->iovec + tqueue->length; + + /*store the tuple */ + memcpy(iov->data, tuple->t_data, tuple->t_len); + tqueue->length += tuple->t_len; + tqueue->count++; + return true; + } + /* once local tuple queue is full, pass them to the shared queue */ + else + { + int byte = 0; + tqueue->length = 0; + while(tqueue->count-- > 0) + { + shm_mq_iovec *iov = tqueue->iovec + byte; + /* notify the receiver only when all the tuples are sent to share queue */ + if (tqueue->count == 0) + result = local_mq_send(tqueue->queue, iov->len, iov->data, false, true); + else + result = local_mq_send(tqueue->queue, iov->len, iov->data, false, false); + byte += sizeof(shm_mq_iovec) + iov->len; + } + tqueue->count = 0; + shm_mq_iovec *iov = tqueue->iovec + tqueue->length; + iov->len = tuple->t_len; + tqueue->length += sizeof (shm_mq_iovec); + iov->data = tqueue->iovec + tqueue->length; + + /*store the tuple */ + memcpy(iov->data, tuple->t_data, tuple->t_len); + tqueue->length += tuple->t_len; + tqueue->count++; + return true; + } + } /* Check for failure. */ if (result == SHM_MQ_DETACHED) return false; @@ -318,6 +367,22 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) return true; } +/* Empty the slot, if there is any content left in it */ +static void +empty_tqueue(TQueueDestReceiver *tqueue) +{ + int byte = 0; + shm_mq_result result; + while(tqueue->count-- > 0) + { + shm_mq_iovec *iov = tqueue->iovec + byte; + if (tqueue->count == 0) + local_mq_send(tqueue->queue, iov->len, iov->data, false, true); + else + local_mq_send(tqueue->queue, iov->len, iov->data, false, false); + byte += sizeof(shm_mq_iovec) + iov->len; + } +} /* * Examine the given datum and send any necessary control messages for @@ -577,7 +642,7 @@ static void tqueueShutdownReceiver(DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; - + empty_tqueue(tqueue); shm_mq_detach(shm_mq_get_queue(tqueue->queue)); } @@ -622,6 +687,8 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle) /* Top-level tupledesc is not known yet */ self->tupledesc = NULL; self->field_remapinfo = NULL; + self->iovec = palloc0(LOCAL_TUPLE_QUEUE_SIZE); + self->length = 0; return (DestReceiver *) self; } diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f5bf807cd6..3e5f17756e 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -141,6 +141,8 @@ struct shm_mq_handle static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written); +static shm_mq_result local_mq_send_bytes(shm_mq_handle *mq, Size nbytes, + const void *data, bool nowait, Size *bytes_written); static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap); static bool shm_mq_counterparty_gone(volatile shm_mq *mq, @@ -327,6 +329,16 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) return shm_mq_sendv(mqh, &iov, 1, nowait); } +shm_mq_result +local_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool notify) +{ + shm_mq_iovec iov; + + iov.data = data; + iov.len = nbytes; + + return local_mq_sendv(mqh, &iov, 1, nowait, notify); +} /* * Write a message into a shared message queue, gathered from multiple @@ -491,6 +503,158 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) /* Notify receiver of the newly-written data, and return. */ return shm_mq_notify_receiver(mq); } +shm_mq_result +local_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool notify) +{ + shm_mq_result res; + shm_mq *mq = mqh->mqh_queue; + Size nbytes = 0; + Size bytes_written; + int i; + int which_iov = 0; + Size offset; + + Assert(mq->mq_sender == MyProc); + + /* Compute total size of write. */ + for (i = 0; i < iovcnt; ++i) + nbytes += iov[i].len; + + /* Try to write, or finish writing, the length word into the buffer. */ + while (!mqh->mqh_length_word_complete) + { + Assert(mqh->mqh_partial_bytes < sizeof(Size)); + res = local_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, + ((char *) &nbytes) +mqh->mqh_partial_bytes, + nowait, &bytes_written); + + if (res == SHM_MQ_DETACHED) + { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + return res; + } + mqh->mqh_partial_bytes += bytes_written; + + if (mqh->mqh_partial_bytes >= sizeof(Size)) + { + Assert(mqh->mqh_partial_bytes == sizeof(Size)); + + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = true; + } + + if (res != SHM_MQ_SUCCESS) + return res; + + /* Length word can't be split unless bigger than required alignment. */ + Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); + } + + /* Write the actual data bytes into the buffer. */ + Assert(mqh->mqh_partial_bytes <= nbytes); + offset = mqh->mqh_partial_bytes; + do + { + Size chunksize; + + /* Figure out which bytes need to be sent next. */ + if (offset >= iov[which_iov].len) + { + offset -= iov[which_iov].len; + ++which_iov; + if (which_iov >= iovcnt) + break; + continue; + } + + /* + * We want to avoid copying the data if at all possible, but every + * chunk of bytes we write into the queue has to be MAXALIGN'd, except + * the last. Thus, if a chunk other than the last one ends on a + * non-MAXALIGN'd boundary, we have to combine the tail end of its + * data with data from one or more following chunks until we either + * reach the last chunk or accumulate a number of bytes which is + * MAXALIGN'd. + */ + if (which_iov + 1 < iovcnt && + offset + MAXIMUM_ALIGNOF > iov[which_iov].len) + { + char tmpbuf[MAXIMUM_ALIGNOF]; + int j = 0; + + for (;;) + { + if (offset < iov[which_iov].len) + { + tmpbuf[j] = iov[which_iov].data[offset]; + j++; + offset++; + if (j == MAXIMUM_ALIGNOF) + break; + } + else + { + offset -= iov[which_iov].len; + which_iov++; + if (which_iov >= iovcnt) + break; + } + } + + res = local_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); + + if (res == SHM_MQ_DETACHED) + { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + return res; + } + + mqh->mqh_partial_bytes += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + continue; + } + + /* + * If this is the last chunk, we can write all the data, even if it + * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to + * MAXALIGN_DOWN the write size. + */ + chunksize = iov[which_iov].len - offset; + if (which_iov + 1 < iovcnt) + chunksize = MAXALIGN_DOWN(chunksize); + res = local_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], + nowait, &bytes_written); + + if (res == SHM_MQ_DETACHED) + { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_length_word_complete = false; + mqh->mqh_partial_bytes = 0; + return res; + } + + mqh->mqh_partial_bytes += bytes_written; + offset += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + } while (mqh->mqh_partial_bytes < nbytes); + + /* Reset for next message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + + /* Notify receiver of the newly-written data, and return. */ + if(notify) + { + return shm_mq_notify_receiver(mq); + } + else return res; +} /* * Receive a message from a shared message queue. @@ -933,6 +1097,125 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, *bytes_written = sent; return SHM_MQ_SUCCESS; } +static shm_mq_result +local_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, + bool nowait, Size *bytes_written) +{ + shm_mq *mq = mqh->mqh_queue; + Size sent = 0; + uint64 used; + Size ringsize = mq->mq_ring_size; + Size available; + + while (sent < nbytes) + { + bool detached = false; + uint64 rb; + + /* Compute number of ring buffer bytes used and available. */ + rb = mq->mq_bytes_read; + Assert(mq->mq_bytes_written >= rb); + used = mq->mq_bytes_written - rb; + Assert(used <= ringsize); + available = Min(ringsize - used, nbytes - sent); + + if (available == 0 && !mqh->mqh_counterparty_attached) + { + /* + * The queue is full, so if the receiver isn't yet known to be + * attached, we must wait for that to happen. + */ + if (nowait) + { + if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) + { + *bytes_written = sent; + return SHM_MQ_DETACHED; + } + if (shm_mq_get_receiver(mq) == NULL) + { + *bytes_written = sent; + return SHM_MQ_WOULD_BLOCK; + } + } + else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, + mqh->mqh_handle)) + { + mq->mq_detached = true; + *bytes_written = sent; + return SHM_MQ_DETACHED; + } + + mqh->mqh_counterparty_attached = true; + + /* + * The receiver may have read some data after attaching, so we + * must not wait without rechecking the queue state. + */ + } + else if (available == 0) + { + shm_mq_result res; + /* Let the receiver know that we need them to read some data. */ + res = shm_mq_notify_receiver(mq); + + if (res != SHM_MQ_SUCCESS) + { + *bytes_written = sent; + return res; + } + + /* Skip manipulation of our latch if nowait = true. */ + if (nowait) + { + *bytes_written = sent; + return SHM_MQ_WOULD_BLOCK; + } + + /* + * Wait for our latch to be set. It might already be set for some + * unrelated reason, but that'll just result in one extra trip + * through the loop. It's worth it to avoid resetting the latch + * at top of loop, because setting an already-set latch is much + * cheaper than setting one that has been reset. + */ + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + else + { + Size offset = mq->mq_bytes_written % (uint64) ringsize; + Size sendnow = Min(available, ringsize - offset); + + /* Write as much data as we can via a single memcpy(). */ + memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], + (char *) data + sent, sendnow); + sent += sendnow; + + /* + * Update count of bytes written, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a run of bytes, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read is as well. + */ + Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); + mq->mq_bytes_written += MAXALIGN(sendnow); + + /* + * For efficiency, we don't set the reader's latch here. We'll do + * that only when the buffer fills up or after writing an entire + * message. + */ + } + } + *bytes_written = sent; + return SHM_MQ_SUCCESS; +} /* * Wait until at least *nbytesp bytes are available to be read from the diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 7a37535ab3..74bb681717 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -76,6 +76,11 @@ extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait); +extern shm_mq_result local_mq_send(shm_mq_handle *mqh, + Size nbytes, const void *data, bool nowait, bool notify); +extern shm_mq_result local_mq_sendv(shm_mq_handle *mqh, + shm_mq_iovec *iov, int iovcnt, bool nowait, bool notify); + /* Wait for our counterparty to attach to the queue. */ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);