Skip to content

Commit 3ab1668

Browse files
committed
Merge pull request ceph#6525 from dillaman/wip-13636-jewel
WorkQueue: new PointerWQ base class for ContextWQ Reviewed-by: Josh Durgin <[email protected]>
2 parents de95d20 + 3e78b18 commit 3ab1668

File tree

1 file changed

+90
-22
lines changed

1 file changed

+90
-22
lines changed

src/common/WorkQueue.h

Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Mutex.h"
1919
#include "Cond.h"
2020
#include "Thread.h"
21+
#include "include/unordered_map.h"
2122
#include "common/config_obs.h"
2223
#include "common/HeartbeatMap.h"
2324

@@ -349,6 +350,66 @@ class ThreadPool : public md_config_obs_t {
349350

350351
};
351352

353+
template<typename T>
354+
class PointerWQ : public WorkQueue_ {
355+
public:
356+
PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
357+
: WorkQueue_(n, ti, sti), m_pool(p) {
358+
m_pool->add_work_queue(this);
359+
}
360+
~PointerWQ() {
361+
m_pool->remove_work_queue(this);
362+
}
363+
void drain() {
364+
m_pool->drain(this);
365+
}
366+
void queue(T *item) {
367+
Mutex::Locker l(m_pool->_lock);
368+
m_items.push_back(item);
369+
m_pool->_cond.SignalOne();
370+
}
371+
protected:
372+
virtual void _clear() {
373+
assert(m_pool->_lock.is_locked());
374+
m_items.clear();
375+
}
376+
virtual bool _empty() {
377+
assert(m_pool->_lock.is_locked());
378+
return m_items.empty();
379+
}
380+
virtual void *_void_dequeue() {
381+
assert(m_pool->_lock.is_locked());
382+
if (m_items.empty()) {
383+
return NULL;
384+
}
385+
386+
T *item = m_items.front();
387+
m_items.pop_front();
388+
return item;
389+
}
390+
virtual void _void_process(void *item, ThreadPool::TPHandle &handle) {
391+
process(reinterpret_cast<T *>(item));
392+
}
393+
virtual void _void_process_finish(void *item) {
394+
}
395+
396+
virtual void process(T *item) = 0;
397+
398+
T *front() {
399+
assert(m_pool->_lock.is_locked());
400+
if (m_items.empty()) {
401+
return NULL;
402+
}
403+
return m_items.front();
404+
}
405+
void signal() {
406+
Mutex::Locker pool_locker(m_pool->_lock);
407+
m_pool->_cond.SignalOne();
408+
}
409+
private:
410+
ThreadPool *m_pool;
411+
std::list<T *> m_items;
412+
};
352413
private:
353414
vector<WorkQueue_*> work_queues;
354415
int last_work_queue;
@@ -488,37 +549,44 @@ class C_QueueInWQ : public Context {
488549

489550
/// Work queue that asynchronously completes contexts (executes callbacks).
490551
/// @see Finisher
491-
class ContextWQ : public ThreadPool::WorkQueueVal<std::pair<Context *, int> > {
552+
class ContextWQ : public ThreadPool::PointerWQ<Context> {
492553
public:
493554
ContextWQ(const string &name, time_t ti, ThreadPool *tp)
494-
: ThreadPool::WorkQueueVal<std::pair<Context *, int> >(name, ti, 0, tp) {}
555+
: ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
556+
m_lock("ContextWQ::m_lock") {
557+
}
495558

496559
void queue(Context *ctx, int result = 0) {
497-
ThreadPool::WorkQueueVal<std::pair<Context *, int> >::queue(
498-
std::make_pair(ctx, result));
560+
if (result != 0) {
561+
Mutex::Locker locker(m_lock);
562+
m_context_results[ctx] = result;
563+
}
564+
ThreadPool::PointerWQ<Context>::queue(ctx);
499565
}
500-
501566
protected:
502-
virtual void _enqueue(std::pair<Context *, int> item) {
503-
_queue.push_back(item);
504-
}
505-
virtual void _enqueue_front(std::pair<Context *, int> item) {
506-
_queue.push_front(item);
507-
}
508-
virtual bool _empty() {
509-
return _queue.empty();
510-
}
511-
virtual std::pair<Context *, int> _dequeue() {
512-
std::pair<Context *, int> item = _queue.front();
513-
_queue.pop_front();
514-
return item;
567+
virtual void _clear() {
568+
ThreadPool::PointerWQ<Context>::_clear();
569+
570+
Mutex::Locker locker(m_lock);
571+
m_context_results.clear();
515572
}
516-
virtual void _process(std::pair<Context *, int> item) {
517-
item.first->complete(item.second);
573+
574+
virtual void process(Context *ctx) {
575+
int result = 0;
576+
{
577+
Mutex::Locker locker(m_lock);
578+
ceph::unordered_map<Context *, int>::iterator it =
579+
m_context_results.find(ctx);
580+
if (it != m_context_results.end()) {
581+
result = it->second;
582+
m_context_results.erase(it);
583+
}
584+
}
585+
ctx->complete(result);
518586
}
519-
using ThreadPool::WorkQueueVal<std::pair<Context *, int> >::_process;
520587
private:
521-
list<std::pair<Context *, int> > _queue;
588+
Mutex m_lock;
589+
ceph::unordered_map<Context*, int> m_context_results;
522590
};
523591

524592
class ShardedThreadPool {

0 commit comments

Comments
 (0)