Safe Haskell | None |
---|
LogicGrowsOnTrees.Parallel.Common.RequestQueue
Contents
Description
To understand the purpose of this module, it helps to know that there are
two main loops running in the supervisor. The first loop runs inside the
SupervisorMonad
and is usually taken over by the adapter, which handles
the communication between the supervisors and the workers. The second loop
(referred to as the controller) is intended for the user to be able to
submit requests such as a global progress update to the supervisor, or
possibly adapter-specific requests (such as changing the number of workers).
With this in mind, the purpose of this module is to create infrastructure for the second loop (the controller) to submit requests to the first loop. It provides this functionality through a class so that specific adapters can extend this to provide requests specific to that adapter (such as changing the number of workers).
- class (HasExplorationMode m, Functor m, MonadCatchIO m) => RequestQueueMonad m where
- abort :: m ()
- addWorkerCountListenerAsync :: (Int -> IO ()) -> IO () -> m ()
- fork :: m () -> m ThreadId
- getCurrentProgressAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()
- getCurrentStatisticsAsync :: (RunStatistics -> IO ()) -> m ()
- getNumberOfWorkersAsync :: (Int -> IO ()) -> m ()
- requestProgressUpdateAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()
- setWorkloadBufferSize :: Int -> m ()
- type Request exploration_mode worker_id m = SupervisorMonad exploration_mode worker_id m ()
- data RequestQueue exploration_mode worker_id m = RequestQueue {
- requests :: !(TChan (Request exploration_mode worker_id m))
- receivers :: !(IORef [ProgressFor exploration_mode -> IO ()])
- controllerThreads :: !(IORef [ThreadId])
- type RequestQueueReader exploration_mode worker_id m = ReaderT (RequestQueue exploration_mode worker_id m) IO
- addWorkerCountListener :: RequestQueueMonad m => (Int -> IO ()) -> m ()
- getCurrentProgress :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))
- getCurrentStatistics :: RequestQueueMonad m => m RunStatistics
- getNumberOfWorkers :: RequestQueueMonad m => m Int
- requestProgressUpdate :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))
- syncAsync :: MonadIO m => ((α -> IO ()) -> m ()) -> m α
- addProgressReceiver :: MonadIO m' => (ProgressFor exploration_mode -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()
- enqueueRequest :: MonadIO m' => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()
- enqueueRequestAndWait :: (MonadIO m, MonadIO m') => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()
- newRequestQueue :: MonadIO m' => m' (RequestQueue exploration_mode worker_id m)
- tryDequeueRequest :: MonadIO m' => RequestQueue exploration_mode worker_id m -> m' (Maybe (Request exploration_mode worker_id m))
- processAllRequests :: MonadIO m => RequestQueue exploration_mode worker_id m -> SupervisorMonad exploration_mode worker_id m ()
- receiveProgress :: MonadIO m' => RequestQueue exploration_mode worker_id m -> ProgressFor exploration_mode -> m' ()
- requestQueueProgram :: MonadIO m => SupervisorMonad exploration_mode worker_id m () -> RequestQueue exploration_mode worker_id m -> SupervisorProgram exploration_mode worker_id m
- forkControllerThread :: MonadIO m' => RequestQueue exploration_mode worker_id m -> RequestQueueReader exploration_mode worker_id m () -> m' ()
- killControllerThreads :: MonadIO m' => RequestQueue exploration_mode worker_id m -> m' ()
- data CPUTimeTracker
- newCPUTimeTracker :: NominalDiffTime -> IO CPUTimeTracker
- startCPUTimeTracker :: RequestQueueMonad m => CPUTimeTracker -> m ()
- getCurrentCPUTime :: CPUTimeTracker -> IO NominalDiffTime
- getQuantityAsync :: (MonadIO m', SupervisorFullConstraint worker_id m) => SupervisorMonad exploration_mode worker_id m α -> (α -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()
Type-classes
class (HasExplorationMode m, Functor m, MonadCatchIO m) => RequestQueueMonad m whereSource
This class provides the set of supervisor requests common to all adapters.
Methods
Abort the supervisor.
addWorkerCountListenerAsync :: (Int -> IO ()) -> IO () -> m ()Source
Submits a function to be called whenever the number of workers changes; the given function will be also called immediately with the current number of workers.
fork :: m () -> m ThreadIdSource
Fork a new thread running in this monad; all controller threads are automnatically killed when the run is finished.
getCurrentProgressAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()Source
Request the current progress, invoking the given callback with the result; see getCurrentProgress
for the synchronous version.
getCurrentStatisticsAsync :: (RunStatistics -> IO ()) -> m ()Source
Get the current run statistics.
getNumberOfWorkersAsync :: (Int -> IO ()) -> m ()Source
Request the number of workers, invoking the given callback with the result; see getNumberOfWorkers
for the synchronous version.
requestProgressUpdateAsync :: (ProgressFor (ExplorationModeFor m) -> IO ()) -> m ()Source
Request that a global progress update be performed, invoking the given callback with the result; see requestProgressUpdate
for the synchronous version.
setWorkloadBufferSize :: Int -> m ()Source
Sets the size of the workload buffer; for more information, see setWorkloadBufferSize
(which links to the LogicGrowsOnTrees.Parallel.Common.Supervisor module).
Instances
RequestQueueMonad (ThreadsControllerMonad exploration_mode) | |
RequestQueueMonad (WorkgroupControllerMonad inner_state exploration_mode) | |
(SupervisorFullConstraint worker_id m, MonadCatchIO m) => RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) |
Types
type Request exploration_mode worker_id m = SupervisorMonad exploration_mode worker_id m ()Source
A supervisor request.
data RequestQueue exploration_mode worker_id m Source
A basic supervisor request queue.
Constructors
RequestQueue | |
Fields
|
Instances
HasExplorationMode (RequestQueueReader exploration_mode worker_id m) | |
(SupervisorFullConstraint worker_id m, MonadCatchIO m) => RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) |
type RequestQueueReader exploration_mode worker_id m = ReaderT (RequestQueue exploration_mode worker_id m) IOSource
A basic supervisor request queue monad, which has an implicit RequestQueue
object that it uses to communicate with the supervisor loop.
Functions
Synchronized requests
addWorkerCountListener :: RequestQueueMonad m => (Int -> IO ()) -> m ()Source
Like addWorkerCountListenerAsync
, but blocks until the listener has been added.
getCurrentProgress :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))Source
Like getCurrentProgressAsync
, but blocks until the result is ready.
getCurrentStatistics :: RequestQueueMonad m => m RunStatisticsSource
Like getCurrentStatisticsAsync
, but blocks until the result is ready.
getNumberOfWorkers :: RequestQueueMonad m => m IntSource
Like getNumberOfWorkersAsync
, but blocks until the result is ready.
requestProgressUpdate :: RequestQueueMonad m => m (ProgressFor (ExplorationModeFor m))Source
Like requestProgressUpdateAsync
, but blocks until the progress update has completed.
syncAsync :: MonadIO m => ((α -> IO ()) -> m ()) -> m αSource
General utility function for converting an asynchronous request to a
synchronous request; it uses an MVar
to hold the result of the request and
blocks until the MVar
has been filled.
Request queue management
addProgressReceiver :: MonadIO m' => (ProgressFor exploration_mode -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()Source
Adds a callback to the given RequestQueue
that will be invoked when the current global progress update has completed.
enqueueRequest :: MonadIO m' => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()Source
Enqueues a supervisor request into the given queue.
enqueueRequestAndWait :: (MonadIO m, MonadIO m') => Request exploration_mode worker_id m -> RequestQueue exploration_mode worker_id m -> m' ()Source
Like enqueueRequest
, but does not return until the request has been run
newRequestQueue :: MonadIO m' => m' (RequestQueue exploration_mode worker_id m)Source
Constructs a new RequestQueue
.
tryDequeueRequest :: MonadIO m' => RequestQueue exploration_mode worker_id m -> m' (Maybe (Request exploration_mode worker_id m))Source
Attempt to pop a request from the RequestQueue
.
Request processing
processAllRequests :: MonadIO m => RequestQueue exploration_mode worker_id m -> SupervisorMonad exploration_mode worker_id m ()Source
Processes all of the requests in the given RequestQueue
, and returns when
the queue has been emptied.
receiveProgress :: MonadIO m' => RequestQueue exploration_mode worker_id m -> ProgressFor exploration_mode -> m' ()Source
Invokes all of the callbacks with the given progress and then clears the list of callbacks.
Arguments
:: MonadIO m | |
=> SupervisorMonad exploration_mode worker_id m () | initialization code to run before the loop is started |
-> RequestQueue exploration_mode worker_id m | the request queue |
-> SupervisorProgram exploration_mode worker_id m |
Creates a supervisor program that loops forever processing requests from the given queue.
Controller threads
Arguments
:: MonadIO m' | |
=> RequestQueue exploration_mode worker_id m | the request queue |
-> RequestQueueReader exploration_mode worker_id m () | the controller thread |
-> m' () |
Forks a controller thread; it's ThreadId
is added the list in the request
queue. We deliberately do not return the ThreadId
from this function
because you must always call killControllerThreads
to kill the controller
thread as this makes sure that all child threads also get killed.
Arguments
:: MonadIO m' | |
=> RequestQueue exploration_mode worker_id m | the request queue |
-> m' () |
Kill all the controller threads and their children.
CPU time tracking
data CPUTimeTracker Source
A data structure that tracks the amount of CPU time that has been used.
newCPUTimeTracker :: NominalDiffTime -> IO CPUTimeTrackerSource
Creates a new CPU time tracker, which should be equal to the amount of total time used so far if we are continuing a previous run and zero otherwise.
startCPUTimeTracker :: RequestQueueMonad m => CPUTimeTracker -> m ()Source
Starts the CPU time tracker; it detects when it has already been started so if you attempt to start it more than once then all subsequent attempts will be ignored.
getCurrentCPUTime :: CPUTimeTracker -> IO NominalDiffTimeSource
Gets the current CPI time.
Miscellaneous
getQuantityAsync :: (MonadIO m', SupervisorFullConstraint worker_id m) => SupervisorMonad exploration_mode worker_id m α -> (α -> IO ()) -> RequestQueue exploration_mode worker_id m -> m' ()Source
Submits a Request
to the supervisor and invokes the given callback with the
result when it is available. (This function is used by
getCurrentProgressAsync
and getNumberOfWorkersAsync
.)