Skip to content

Commit ebb276c

Browse files
committed
QPID-7329: Merge branch 'github/pr/10' into trunk
Closes #10 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1750587 13f79535-47bb-0310-9956-ffa450edef68
1 parent 1352571 commit ebb276c

File tree

4 files changed

+37
-1
lines changed

4 files changed

+37
-1
lines changed

qpid/cpp/src/qpid/broker/Queue.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,19 @@ void Queue::process(Message& msg)
345345
}
346346
}
347347

348+
void Queue::mergeMessageAnnotations(const QueueCursor& position,
349+
const qpid::types::Variant::Map& messageAnnotations)
350+
{
351+
Mutex::ScopedLock locker(messageLock);
352+
Message *message = messages->find(position);
353+
if (!message) return;
354+
355+
qpid::types::Variant::Map::const_iterator it;
356+
for (it = messageAnnotations.begin(); it != messageAnnotations.end(); ++it) {
357+
message->addAnnotation(it->first, it->second);
358+
}
359+
}
360+
348361
void Queue::release(const QueueCursor& position, bool markRedelivered)
349362
{
350363
QueueListeners::NotificationSet copy;

qpid/cpp/src/qpid/broker/Queue.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
331331
private:
332332
QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0);
333333
public:
334+
/**
335+
* Merges message annotations for an in-memory message as a result of
336+
* a modified disposition outcome
337+
*/
338+
QPID_BROKER_EXTERN void mergeMessageAnnotations(const QueueCursor& msg,
339+
const qpid::types::Variant::Map& annotations);
340+
334341
/**
335342
* Returns a message to the in-memory queue (due to lack
336343
* of acknowledegement from a receiver). If a consumer is

qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* under the License.
1919
*
2020
*/
21+
#include "qpid/broker/amqp/DataReader.h"
2122
#include "qpid/broker/amqp/Outgoing.h"
2223
#include "qpid/broker/amqp/Exception.h"
2324
#include "qpid/broker/amqp/Header.h"
@@ -108,6 +109,19 @@ void OutgoingFromQueue::write(const char* data, size_t size)
108109
pn_link_send(link, data, size);
109110
}
110111

112+
void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r)
113+
{
114+
pn_data_t *remoteAnnotationsRaw =
115+
pn_disposition_annotations(pn_delivery_remote(r.delivery));
116+
if (remoteAnnotationsRaw == 0) {
117+
return;
118+
}
119+
120+
qpid::types::Variant::Map remoteMessageAnnotations;
121+
DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations);
122+
queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations);
123+
}
124+
111125
void OutgoingFromQueue::handle(pn_delivery_t* delivery)
112126
{
113127
size_t i = Record::getIndex(pn_delivery_tag(delivery));
@@ -141,7 +155,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
141155
break;
142156
case PN_MODIFIED:
143157
if (preAcquires()) {
144-
//TODO: handle message-annotations
158+
mergeMessageAnnotationsIfRequired(r);
145159
if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
146160
if (!trackingUndeliverableMessages) {
147161
// observe queue for changes to track undeliverable messages

qpid/cpp/src/qpid/broker/amqp/Outgoing.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer,
152152
static size_t getIndex(pn_delivery_tag_t);
153153
};
154154

155+
void mergeMessageAnnotationsIfRequired(const Record &r);
156+
155157
const bool exclusive;
156158
const bool isControllingUser;
157159
boost::shared_ptr<Queue> queue;

0 commit comments

Comments
 (0)