Safe Haskell | None |
---|
LogicGrowsOnTrees.Parallel.Common.Worker
Contents
Description
The Worker
module contains the workhorse code of the parallelization
infrastructure in the form of the forkWorkerThread
function, which
explores a tree step by step while continuously polling for requests; for
more details see forkWorkerThread
.
- data ProgressUpdate progress = ProgressUpdate {
- progressUpdateProgress :: progress
- progressUpdateRemainingWorkload :: Workload
- type ProgressUpdateFor exploration_mode = ProgressUpdate (ProgressFor exploration_mode)
- data StolenWorkload progress = StolenWorkload {
- stolenWorkloadProgressUpdate :: ProgressUpdate progress
- stolenWorkload :: Workload
- type StolenWorkloadFor exploration_mode = StolenWorkload (ProgressFor exploration_mode)
- type WorkerRequestQueue progress = IORef [WorkerRequest progress]
- type WorkerRequestQueueFor exploration_mode = WorkerRequestQueue (ProgressFor exploration_mode)
- data WorkerEnvironment progress = WorkerEnvironment {
- workerInitialPath :: Path
- workerThreadId :: ThreadId
- workerPendingRequests :: WorkerRequestQueue progress
- workerTerminationFlag :: IVar ()
- type WorkerEnvironmentFor exploration_mode = WorkerEnvironment (ProgressFor exploration_mode)
- data WorkerTerminationReason worker_final_progress
- = WorkerFinished worker_final_progress
- | WorkerFailed String
- | WorkerAborted
- type WorkerTerminationReasonFor exploration_mode = WorkerTerminationReason (WorkerFinishedProgressFor exploration_mode)
- type family WorkerPushActionFor exploration_mode :: *
- forkWorkerThread :: ExplorationMode exploration_mode -> Purity m n -> (WorkerTerminationReasonFor exploration_mode -> IO ()) -> TreeT m (ResultFor exploration_mode) -> Workload -> WorkerPushActionFor exploration_mode -> IO (WorkerEnvironmentFor exploration_mode)
- sendAbortRequest :: WorkerRequestQueue progress -> IO ()
- sendProgressUpdateRequest :: WorkerRequestQueue progress -> (ProgressUpdate progress -> IO ()) -> IO ()
- sendWorkloadStealRequest :: WorkerRequestQueue progress -> (Maybe (StolenWorkload progress) -> IO ()) -> IO ()
- exploreTreeGeneric :: (WorkerPushActionFor exploration_mode ~ (Void -> ()), ResultFor exploration_mode ~ α) => ExplorationMode exploration_mode -> Purity m n -> TreeT m α -> IO (WorkerTerminationReason (FinalResultFor exploration_mode))
Types
Worker interaction
data ProgressUpdate progress Source
A progress update sent to the supervisor; it has a component which contains
information about how much of the tree has been explored and what results
have been found so far, as well as the remaining Workload
to be completed
by this worker.
Constructors
ProgressUpdate | |
Fields
|
Instances
Eq progress => Eq (ProgressUpdate progress) | |
Show progress => Show (ProgressUpdate progress) | |
Serialize progress_1627542337 => Serialize (ProgressUpdate progress_1627542337) |
type ProgressUpdateFor exploration_mode = ProgressUpdate (ProgressFor exploration_mode)Source
A convenient type alias for the type of ProgressUpdate
associated with the
given exploration mode.
data StolenWorkload progress Source
A stolen workload sent to the supervisor; in addition to a component with
the stolen Workload
itself, it also has a ProgressUpdate
component,
which is required in order to maintain the invariant that all of the
Workload
s that the supervisor has on file (both assigned to workers and
unassigned) plus the current progress equals the full tree.
Constructors
StolenWorkload | |
Fields
|
Instances
Eq progress => Eq (StolenWorkload progress) | |
Show progress => Show (StolenWorkload progress) | |
Serialize progress_1627542419 => Serialize (StolenWorkload progress_1627542419) |
type StolenWorkloadFor exploration_mode = StolenWorkload (ProgressFor exploration_mode)Source
A convenient type alias for the type of StolenWorkload
associated with the
the given exploration mode.
type WorkerRequestQueue progress = IORef [WorkerRequest progress]Source
A queue of worker requests.
NOTE: Although the type is a list, and requests are added by prepending them to the list, it still acts as a queue because the worker will reverse the list before processing the requests.
type WorkerRequestQueueFor exploration_mode = WorkerRequestQueue (ProgressFor exploration_mode)Source
A convenient type alias for the type of WorkerRequestQueue
associated with
the given exploration mode.
data WorkerEnvironment progress Source
The environment of a running worker.
Constructors
WorkerEnvironment | |
Fields
|
type WorkerEnvironmentFor exploration_mode = WorkerEnvironment (ProgressFor exploration_mode)Source
A convenient type alias for the type of WorkerEnvironment
associated with
the given exploration mode.
data WorkerTerminationReason worker_final_progress Source
The reason why a worker terminated.
Constructors
WorkerFinished worker_final_progress | worker completed normally without error; included is the final result |
WorkerFailed String | worker failed; included is the message of the failure (this would have
been a value of type |
WorkerAborted | worker was aborted by either an external request or the |
Instances
Functor WorkerTerminationReason | The |
Eq worker_final_progress => Eq (WorkerTerminationReason worker_final_progress) | |
Show worker_final_progress => Show (WorkerTerminationReason worker_final_progress) |
type WorkerTerminationReasonFor exploration_mode = WorkerTerminationReason (WorkerFinishedProgressFor exploration_mode)Source
A convenient type alias for the type of WorkerTerminationReason
associated
with the given exploration mode.
type family WorkerPushActionFor exploration_mode :: *Source
The action that a worker can take to push a result to the supervisor; this
type is effectively null (with the exact value absurd
) for all modes
except FoundModeUsingPush
.
Functions
Arguments
:: ExplorationMode exploration_mode | the mode in to explore the tree |
-> Purity m n | the purity of the tree |
-> (WorkerTerminationReasonFor exploration_mode -> IO ()) | the action to run when the worker has terminated |
-> TreeT m (ResultFor exploration_mode) | the tree |
-> Workload | the workload for the worker |
-> WorkerPushActionFor exploration_mode | the action to push a result to the supervisor; this should be equal
to |
-> IO (WorkerEnvironmentFor exploration_mode) | the environment for the worker |
The forkWorkerThread
function is the workhorse of the parallization
infrastructure; it explores a tree in a separate thread while polling for
requests. Specifically, the worker alternates between stepping through the
tree and checking to see if there are any new requests in the queue.
The worker is optimized around the observation that the vast majority of its
time is spent exploring the tree rather than responding to requests, and so
the amount of overhead needed to check if any requests are present needs to
be minimized at the cost of possibly delaying a response to an incoming
request. For this reason, it uses an IORef
for the queue to minimize the
cost of peeking at it rather than an MVar
or some other thread
synchronization variable; the trade-off is that if a request is added to the
queue by a different processor then it might not be noticed immediately the
caches get synchronized. Likewise, the request queue uses the List type
rather than something like Data.Sequence for simplicity; the vast majority
of the time the worker will encounter an empty list, and on the rare
occasion when the list is non-empty it will be short enough that
reverse
ing it will not pose a significant cost.
At any given point in the exploration, there is an initial path which locates the subtree that was given as the original workload, a cursor which indicates the subtree within this subtree that makes up the current workload, and the context which indicates the current location in the subtree that is being explored. All workers start with an empty cursor; when a workload is stolen, decisions made early on in the the context are frozen and moved into the cursor because if they were not then when the worker backtracked it would explore a workload that it just gave away, resulting in some results being observed twice.
The worker terminates either if it finishes exploring all the nodes in its
(current) workload, if an error occurs, or if it is aborted either via.
the ThreadKilled
and UserInterrupt
exceptions or by an abort request
placed in the request queue.
sendAbortRequest :: WorkerRequestQueue progress -> IO ()Source
Sends a request to abort.
sendProgressUpdateRequestSource
Arguments
:: WorkerRequestQueue progress | the request queue |
-> (ProgressUpdate progress -> IO ()) | the action to perform when the update is available |
-> IO () |
Sends a request for a progress update along with a response action to perform when the progress update is available.
sendWorkloadStealRequestSource
Arguments
:: WorkerRequestQueue progress | the request queue |
-> (Maybe (StolenWorkload progress) -> IO ()) | the action to perform when the update is available |
-> IO () |
Sends a request to steal a workload along with a response action to perform when the progress update is available.
exploreTreeGeneric :: (WorkerPushActionFor exploration_mode ~ (Void -> ()), ResultFor exploration_mode ~ α) => ExplorationMode exploration_mode -> Purity m n -> TreeT m α -> IO (WorkerTerminationReason (FinalResultFor exploration_mode))Source
Explores a tree with the specified purity using the given mode by forking a worker thread and waiting for it to finish; it exists to facilitate testing and benchmarking and is not a function that you are likely to ever have a need for yourself.