#include <queue>
#include <deferred/deferred_manager_api.h>
#include <deferred/thread_manager_api.h>
#ifdef BOOST_MSVC
# pragma warning(push)
# pragma warning(disable:4251 4275 4231 4660)
#endif
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/tss.hpp>
#include <boost/thread/xtime.hpp>
#ifdef BOOST_MSVC
# pragma warning(pop)
#endif
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <list>
namespace deferred
{
// The following three functions are defined in thread_manger.cpp
// there aren't enough to justify a full header so we just toss them in here
//
void add_dm( DeferredManager * dm );
void remove_dm( DeferredManager * dm );
void rename_dm( const std::string & old_name, const std::string & new_name );
namespace
{// mili micro nano
const int NANOSECONDS_PER_SECOND = 1 * 1000 * 1000 * 1000;
}
struct QCommand
{
SafePtr<Deferred> deferred;
DeferredCommand command;
};
class DeferredManagerImpl : public DeferredManager
{
std::string thread_name;
boost::mutex mutex;
boost::condition condition;
std::queue< QCommand > processing_queue;
bool shutdown_requested;
SafePtr<ThreadManager> thr_mgr; // Hold a safe ref so the manager doesn't get deleted too soon.
static boost::mutex static_mutex;
static int static_instance_count; // used to generate initial thread-names
boost::function0<void> enqueue_hook;
typedef std::list< boost::function1<void, DeferredManager *> > CleanupCallbackList;
CleanupCallbackList cleanup_callbacks;
typedef boost::mutex::scoped_lock Lock;
SafePtr<CallbackData> _req_shutdown(); // helper method
public:
DeferredManagerImpl();
virtual ~DeferredManagerImpl();
virtual std::string & getThreadName()
{
return thread_name;
}
virtual void setThreadName( const std::string & new_name );
virtual void queueCommand( const boost::function0< void > & cmd );
virtual SafePtr<Deferred> queueDeferredCommand( const DeferredCommand & dc );
virtual void queueCompletedDeferred( SafePtr<Deferred> d, SafePtr<CallbackData> result );
virtual void addEnqueueHook( const boost::function0<void> & cmd );
virtual int queueLength();
virtual void executeQueuedOperations(bool throw_on_failure = false);
virtual void executeNextOperation(bool throw_on_failure = false);
// Returns false if the timeout occurs prior to executing any operations
//
virtual bool waitForAnyOperation( long sec=0, long usec=0 );
virtual bool waitForCompletionOf( SafePtr<Deferred> d,
long sec=0, long usec=0 );
// Adds a function throwing ShutdownRequested to the end of the processing queue
//
virtual void requestShutdown();
void wake( SafePtr<Deferred> d );
void addDestructorCallback( const boost::function1<void, DeferredManager *> & cmd );
// returns true if a shutdown has been requested
//
virtual bool shutdownRequested() { return shutdown_requested; }
void execute_one( bool throw_on_failure );
};
namespace
{
boost::thread_specific_ptr< SafePtr<DeferredManagerImpl> > ts_mgr;
void do_nothing()
{}
};
TSDM_Singleton g_deferred_manager;
SafePtr<DeferredManager> TSDM_Singleton::operator ->()
{
return *this;
}
TSDM_Singleton::operator SafePtr<DeferredManager> ()
{
SafePtr<DeferredManagerImpl> * p = ts_mgr.get();
if (!p)
{
p = new SafePtr<DeferredManagerImpl>();
*p = new DeferredManagerImpl();
ts_mgr.reset(p);
}
return *p;
}
//----------------------------------------------------------------------------------------------
// define storage for static vars
boost::mutex DeferredManagerImpl::static_mutex;
int DeferredManagerImpl::static_instance_count;
DeferredManagerImpl::DeferredManagerImpl() : shutdown_requested(false)
{
Lock guard( static_mutex );
thread_name = "DeferredThread:" + boost::lexical_cast<std::string>( ++static_instance_count );
enqueue_hook = do_nothing;
thr_mgr = &(*thread_manager);
add_dm( this );
}
DeferredManagerImpl::~DeferredManagerImpl()
{
try
{
CleanupCallbackList::iterator i;
for( i = cleanup_callbacks.begin(); i != cleanup_callbacks.end(); i++ )
i->operator()( this );
}
catch(...)
{}
remove_dm( this );
}
void DeferredManagerImpl::setThreadName( const std::string & new_name )
{
Lock guard( mutex );
rename_dm( thread_name, new_name );
thread_name = new_name;
}
namespace
{
SafePtr<CallbackData> execute_command( const boost::function0< void > & cmd )
{
cmd();
return 0;
}
}
void DeferredManagerImpl::queueCommand( const boost::function0< void > & cmd )
{
{
Lock guard( mutex );
processing_queue.push( QCommand() );
processing_queue.back().command = boost::bind( execute_command, cmd );
condition.notify_one();
}
enqueue_hook();
}
SafePtr<Deferred> DeferredManagerImpl::queueDeferredCommand( const DeferredCommand & dc )
{
SafePtr<Deferred> ret;
{
Lock guard( mutex );
processing_queue.push( QCommand() );
ret = createDeferred();
processing_queue.back().deferred = ret;
processing_queue.back().command = dc;
condition.notify_one();
}
enqueue_hook();
return ret;
}
namespace
{
SafePtr<CallbackData> execute_deferred( SafePtr<Deferred> & d, SafePtr<CallbackData> & result )
{
d->execute( result );
return d;
}
}
void DeferredManagerImpl::queueCompletedDeferred( SafePtr<Deferred> d, SafePtr<CallbackData> result )
{
{
Lock guard( mutex );
processing_queue.push( QCommand() );
processing_queue.back().command = boost::bind( execute_deferred, d, result );
condition.notify_one();
}
enqueue_hook();
}
void DeferredManagerImpl::addEnqueueHook( const boost::function0<void> & cmd )
{
enqueue_hook = cmd;
}
int DeferredManagerImpl::queueLength()
{
Lock guard( mutex );
return (int) processing_queue.size();
}
void DeferredManagerImpl::execute_one( bool throw_on_failure )
{
SafePtr<Deferred> deferred;
DeferredCommand command;
SafePtr<CallbackData> result;
{ // isolate the lock here so that we can execute the command without holding the lock
Lock guard(mutex);
if( processing_queue.size() == 0 )
return;
deferred = processing_queue.front().deferred;
command = processing_queue.front().command;
processing_queue.pop();
}
if( deferred )
{
try
{
result = command();
}
catch( Failure & f )
{
result = f.clone();
}
deferred->execute( result );
}
else
{
// the result with either be 0 (for case of executing a normal command)
// or will be a valid SafePtr<Deferred> ( for a deferred command )
//
try
{
result = command();
if( result )
{
deferred = static_pointer_cast<Deferred>( result );
if( deferred->status() == Deferred::FAILURE )
result = static_pointer_cast<Failure>( deferred->endResult() );
}
}
catch( Failure & f )
{
result = f.clone();
}
}
SafePtr<Failure> err = dynamic_pointer_cast<Failure>(result);
if( throw_on_failure && err )
err->throw_self();
}
void DeferredManagerImpl::executeQueuedOperations( bool throw_on_failure )
{
while( processing_queue.size() != 0 )
execute_one( throw_on_failure );
}
void DeferredManagerImpl::executeNextOperation( bool throw_on_failure )
{
if( processing_queue.size() != 0 )
execute_one( throw_on_failure );
}
bool DeferredManagerImpl::waitForAnyOperation( long sec, long usec )
{
Lock guard( mutex );
if (processing_queue.size() > 0)
return true;
if (sec == 0 && usec == 0)
{
condition.wait( guard );
return false;
}
else
{
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
xt.sec += sec;
xt.nsec += usec * 1000;
while(xt.nsec >= NANOSECONDS_PER_SECOND)
{
++xt.sec;
xt.nsec -= NANOSECONDS_PER_SECOND;
}
return condition.timed_wait( guard, xt );
}
}
bool DeferredManagerImpl::waitForCompletionOf( SafePtr<Deferred> d,
long sec, long usec )
{
boost::xtime xt;
bool use_xt = false;
if (d->isComplete())
return true;
if ( sec != 0 || usec != 0 )
{
use_xt = true;
boost::xtime_get(&xt, boost::TIME_UTC);
xt.sec += sec;
xt.nsec += usec * 1000;
while(xt.nsec >= NANOSECONDS_PER_SECOND)
{
++xt.sec;
xt.nsec -= NANOSECONDS_PER_SECOND;
}
}
d->addCompleteCallback( boost::bind( &DeferredManagerImpl::wake, this, _1 ) );
while( true )
{
if( d->isComplete() )
return true;
while( processing_queue.size() != 0 )
{
execute_one( false );
if( d->isComplete() )
return true;
}
if( use_xt )
{
Lock guard( mutex );
if (!condition.timed_wait( guard, xt ) )
return false;
}
else
{
Lock guard( mutex );
condition.wait( guard );
}
}
}
SafePtr<CallbackData> DeferredManagerImpl::_req_shutdown()
{
shutdown_requested = true;
throw ShutdownRequested();
}
void DeferredManagerImpl::requestShutdown()
{
queueCommand( boost::bind(&DeferredManagerImpl::_req_shutdown, this) );
}
void DeferredManagerImpl::wake( SafePtr<Deferred> d )
{
Lock guard( mutex );
condition.notify_one();
}
void DeferredManagerImpl::addDestructorCallback( const boost::function1<void, DeferredManager *> & cmd )
{
Lock guard( mutex );
cleanup_callbacks.push_back( cmd );
}
}// end namespace deferred