shm_mq: After a send fails with SHM_MQ_DETACHED, later ones should too.
authorRobert Haas <[email protected]>
Mon, 6 Jun 2016 18:35:30 +0000 (14:35 -0400)
committerRobert Haas <[email protected]>
Mon, 6 Jun 2016 18:35:30 +0000 (14:35 -0400)
Prior to this patch, it was occasionally possible, after shm_mq_sendv
had previously returned SHM_MQ_DETACHED, for a later shm_mq_sendv
operation to fail an assertion instead of just again returning
SHM_MQ_ATTACHED.  From the shm_mq code's point of view, it was
expecting to be called again with the same arguments, since the
previous operation had only partially completed.  However, a caller
who isn't using non-blocking mode won't be prepared to repeat the call
with the same arguments, and this code shouldn't expect that they
will.  Repair in such a way that we'll be OK whether the next call
uses the same arguments or not.

Found by Andreas Seltenreich.  Analysis and sketch of fix by Amit
Kapila.  Patch by me, reviewed by Amit Kapila.

src/backend/storage/ipc/shm_mq.c

index 7d1c9cdbd5a07e0b034a32a674191972866ff3fc..03ca79b5e3661dabe2d0cc8268eefcbfd4da2b24 100644 (file)
@@ -366,9 +366,15 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
        res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
                                ((char *) &nbytes) +mqh->mqh_partial_bytes,
                                nowait, &bytes_written);
-       mqh->mqh_partial_bytes += bytes_written;
-       if (res != SHM_MQ_SUCCESS)
+
+       if (res == SHM_MQ_DETACHED)
+       {
+           /* Reset state in case caller tries to send another message. */
+           mqh->mqh_partial_bytes = 0;
+           mqh->mqh_length_word_complete = false;
            return res;
+       }
+       mqh->mqh_partial_bytes += bytes_written;
 
        if (mqh->mqh_partial_bytes >= sizeof(Size))
        {
@@ -378,6 +384,9 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
            mqh->mqh_length_word_complete = true;
        }
 
+       if (res != SHM_MQ_SUCCESS)
+           return res;
+
        /* Length word can't be split unless bigger than required alignment. */
        Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
    }
@@ -432,7 +441,17 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
                        break;
                }
            }
+
            res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+
+           if (res == SHM_MQ_DETACHED)
+           {
+               /* Reset state in case caller tries to send another message. */
+               mqh->mqh_partial_bytes = 0;
+               mqh->mqh_length_word_complete = false;
+               return res;
+           }
+
            mqh->mqh_partial_bytes += bytes_written;
            if (res != SHM_MQ_SUCCESS)
                return res;
@@ -449,6 +468,15 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
            chunksize = MAXALIGN_DOWN(chunksize);
        res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
                                nowait, &bytes_written);
+
+       if (res == SHM_MQ_DETACHED)
+       {
+           /* Reset state in case caller tries to send another message. */
+           mqh->mqh_length_word_complete = false;
+           mqh->mqh_partial_bytes = 0;
+           return res;
+       }
+
        mqh->mqh_partial_bytes += bytes_written;
        offset += bytes_written;
        if (res != SHM_MQ_SUCCESS)