Skip to content

Commit 2c36085

Browse files
committed
QPID-4397: log more detail for expired messages when debug enabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1745448 13f79535-47bb-0310-9956-ffa450edef68
1 parent edee47f commit 2c36085

File tree

8 files changed

+107
-6
lines changed

8 files changed

+107
-6
lines changed

qpid/cpp/src/qpid/broker/Message.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,11 @@ qpid::types::Variant Message::getProperty(const std::string& key) const
301301
return r.getResult();
302302
}
303303

304+
std::string Message::printProperties() const
305+
{
306+
return sharedState->printProperties();
307+
}
308+
304309
boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
305310
{
306311
return persistentContext;

qpid/cpp/src/qpid/broker/Message.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Message {
8181
virtual std::string getTo() const = 0;
8282
virtual std::string getSubject() const = 0;
8383
virtual std::string getReplyTo() const = 0;
84+
virtual std::string printProperties() const = 0;
8485
};
8586

8687
class SharedState : public Encoding
@@ -153,6 +154,7 @@ class Message {
153154
QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
154155
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
155156
void processProperties(qpid::amqp::MapHandler&) const;
157+
std::string printProperties() const;
156158

157159
QPID_BROKER_EXTERN uint64_t getMessageSize() const;
158160

qpid/cpp/src/qpid/broker/Queue.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
421421
QueueCursor cursor = c->getCursor(); // Save current position.
422422
Message* msg = messages->next(*c); // Advances c.
423423
if (msg) {
424-
if (msg->getExpiration() < sys::AbsTime::now()) {
424+
if (isExpired(name, *msg, sys::AbsTime::now())) {
425425
QPID_LOG(debug, "Message expired from queue '" << name << "'");
426426
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
427427
//ERROR: don't hold lock across call to store!!
@@ -627,11 +627,14 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons
627627
}
628628
}
629629

630-
namespace{
631-
bool hasExpired(const Message& m, AbsTime now)
630+
bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now)
632631
{
633-
return m.getExpiration() < now;
634-
}
632+
if (m.getExpiration() < now) {
633+
QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties());
634+
return true;
635+
} else {
636+
return false;
637+
}
635638
}
636639

637640
/**
@@ -646,7 +649,7 @@ void Queue::purgeExpired(sys::Duration lapse) {
646649
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
647650
if (seconds == 0 || count / seconds < 1) {
648651
sys::AbsTime time = sys::AbsTime::now();
649-
uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete);
652+
uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
650653
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
651654
//
652655
// Report the count of discarded-by-ttl messages

qpid/cpp/src/qpid/broker/Queue.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
530530

531531
//utility function
532532
static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
533+
static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
533534

534535
friend class QueueFactory;
535536
};

qpid/cpp/src/qpid/broker/amqp/Message.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,85 @@ std::string Message::getPropertyAsString(const std::string& key) const
151151
return sr.getValue();
152152
}
153153

154+
namespace {
155+
class PropertyPrinter : public MapHandler
156+
{
157+
public:
158+
std::stringstream out;
159+
160+
PropertyPrinter() : first(true) {}
161+
void handleVoid(const CharSequence&) {}
162+
void handleBool(const CharSequence& key, bool value) { handle(key, value); }
163+
void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
164+
void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
165+
void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
166+
void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
167+
void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
168+
void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
169+
void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
170+
void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
171+
void handleFloat(const CharSequence& key, float value) { handle(key, value); }
172+
void handleDouble(const CharSequence& key, double value) { handle(key, value); }
173+
void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
174+
{
175+
handle(key, value.str());
176+
}
177+
std::string str() { return out.str(); }
178+
bool print(const std::string& key, const std::string& value, bool prependComma) {
179+
if (prependComma) out << ", ";
180+
if (!value.empty()) {
181+
out << key << "=" << value;
182+
return true;
183+
} else {
184+
return false;
185+
}
186+
}
187+
template <typename T> bool print_(const std::string& key, T value, bool prependComma) {
188+
if (prependComma) out << ", ";
189+
if (value) {
190+
out << key << "=" << value;
191+
return true;
192+
} else {
193+
return false;
194+
}
195+
}
196+
197+
private:
198+
bool first;
199+
200+
template <typename T> void handle(const CharSequence& key, T value)
201+
{
202+
if (first) {
203+
first = false;
204+
} else {
205+
out << ", ";
206+
}
207+
out << key.str() << "=" << value;
208+
}
209+
};
210+
}
211+
212+
std::string Message::printProperties() const
213+
{
214+
PropertyPrinter r;
215+
bool comma = false;
216+
comma = r.print("subject", getSubject(), comma);
217+
comma = r.print("message-id", getMessageId().str(), comma);
218+
comma = r.print("correlation-id", getCorrelationId().str(), comma);
219+
comma = r.print("user-id", getUserId(), comma);
220+
comma = r.print("to", getTo(), comma);
221+
comma = r.print("reply-to", getReplyTo(), comma);
222+
comma = r.print_("priority", (uint32_t) getPriority(), comma);
223+
comma = r.print_("durable", isPersistent(), comma);
224+
uint64_t ttl(0);
225+
getTtl(ttl);
226+
comma = r.print_("ttl", ttl, comma);
227+
r.out << ", application-properties={";
228+
processProperties(r);
229+
r.out << "}";
230+
return r.str();
231+
}
232+
154233
namespace {
155234
class PropertyAdapter : public Reader {
156235
MapHandler& handler;

qpid/cpp/src/qpid/broker/amqp/Message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq
5151
bool getTtl(uint64_t&) const;
5252
std::string getContent() const;
5353
void processProperties(qpid::amqp::MapHandler&) const;
54+
std::string printProperties() const;
5455
std::string getUserId() const;
5556
uint64_t getTimestamp() const;
5657
std::string getTo() const;

qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,15 @@ bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, co
395395
return transfer && transfer->isLastQMFResponse(correlation);
396396
}
397397

398+
std::string MessageTransfer::printProperties() const
399+
{
400+
std::stringstream out;
401+
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
402+
if (mp) {
403+
out << *mp;
404+
}
405+
return out.str();
406+
}
398407

399408
void MessageTransfer::processProperties(qpid::amqp::MapHandler& handler) const
400409
{

qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp
5454
bool hasExpiration() const;
5555
std::string getExchangeName() const;
5656
void processProperties(qpid::amqp::MapHandler&) const;
57+
std::string printProperties() const;
5758
std::string getUserId() const;
5859
void setTimestamp();
5960
uint64_t getTimestamp() const;

0 commit comments

Comments
 (0)