Remote
Contents
Description
Cloud Haskell (previously Remote Haskell) is a distributed computing framework for Haskell. We can describe its interface as roughly two levels: the process layer, consisting of processes, messages, and fault monitoring; and the task layer, consisting of tasks, promises, and fault recovery. This summary module provides the most common interface functions for both layers, although advanced users might want to import names from the other constituent modules, as well.
- remoteInit :: Maybe FilePath -> [RemoteCallMetaData] -> (String -> ProcessM ()) -> IO ()
- data ProcessM a
- data NodeId
- data ProcessId
- data MatchM q a
- getSelfPid :: ProcessM ProcessId
- getSelfNode :: ProcessM NodeId
- send :: Serializable a => ProcessId -> a -> ProcessM ()
- sendQuiet :: Serializable a => ProcessId -> a -> ProcessM TransmitStatus
- spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
- spawnLocal :: ProcessM () -> ProcessM ProcessId
- spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessId
- spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
- callRemote :: Serializable a => NodeId -> Closure (ProcessM a) -> ProcessM a
- callRemotePure :: Serializable a => NodeId -> Closure a -> ProcessM a
- callRemoteIO :: Serializable a => NodeId -> Closure (IO a) -> ProcessM a
- data AmSpawnOptions = AmSpawnOptions {}
- defaultSpawnOptions :: AmSpawnOptions
- terminate :: ProcessM a
- expect :: Serializable a => ProcessM a
- receive :: [MatchM q ()] -> ProcessM (Maybe q)
- receiveWait :: [MatchM q ()] -> ProcessM q
- receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)
- match :: Serializable a => (a -> ProcessM q) -> MatchM q ()
- matchIf :: Serializable a => (a -> Bool) -> (a -> ProcessM q) -> MatchM q ()
- matchUnknown :: ProcessM q -> MatchM q ()
- matchUnknownThrow :: MatchM q ()
- matchProcessDown :: ProcessId -> ProcessM q -> MatchM q ()
- logS :: LogSphere -> LogLevel -> String -> ProcessM ()
- say :: String -> ProcessM ()
- type LogSphere = String
- data LogTarget
- data LogFilter
- data LogConfig = LogConfig {}
- data LogLevel
- = LoSay
- | LoFatal
- | LoCritical
- | LoImportant
- | LoStandard
- | LoInformation
- | LoTrivial
- setLogConfig :: LogConfig -> ProcessM ()
- setNodeLogConfig :: LogConfig -> ProcessM ()
- getLogConfig :: ProcessM LogConfig
- defaultLogConfig :: LogConfig
- getCfgArgs :: ProcessM [String]
- data UnknownMessageException = UnknownMessageException String
- data ServiceException = ServiceException String
- data TransmitException = TransmitException TransmitStatus
- data TransmitStatus
- nameSet :: String -> ProcessM ()
- nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId)
- nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessId
- linkProcess :: ProcessId -> ProcessM ()
- monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()
- unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()
- withMonitor :: ProcessId -> ProcessM a -> ProcessM a
- data MonitorAction
- = MaMonitor
- | MaLink
- | MaLinkError
- data ProcessMonitorException = ProcessMonitorException ProcessId SignalReason
- getPeers :: ProcessM PeerInfo
- findPeerByRole :: PeerInfo -> String -> [NodeId]
- type PeerInfo = Map String [NodeId]
- remotable :: [Name] -> Q [Dec]
- type RemoteCallMetaData = Lookup -> Lookup
- data Lookup
- data Closure a
- makeClosure :: (Typeable a, Serializable v) => String -> v -> ProcessM (Closure a)
- invokeClosure :: Typeable a => Closure a -> ProcessM (Maybe a)
- data Payload
- genericPut :: Data a => a -> Put
- genericGet :: Data a => Get a
- class (Binary a, Typeable a) => Serializable a
- data SendPort a
- data ReceivePort a
- newChannel :: Serializable a => ProcessM (SendPort a, ReceivePort a)
- sendChannel :: Serializable a => SendPort a -> a -> ProcessM ()
- receiveChannel :: Serializable a => ReceivePort a -> ProcessM a
- data CombinedChannelAction b
- combinedChannelAction :: Serializable a => ReceivePort a -> (a -> b) -> CombinedChannelAction b
- combinePortsBiased :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)
- combinePortsRR :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)
- mergePortsBiased :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)
- terminateChannel :: Serializable a => ReceivePort a -> ProcessM ()
- data TaskM a
- runTask :: TaskM a -> ProcessM a
- data Promise a
- newPromise :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)
- newPromiseHere :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)
- newPromiseAtRole :: Serializable a => String -> Closure (TaskM a) -> TaskM (Promise a)
- newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a)
- toPromise :: Serializable a => a -> TaskM (Promise a)
- toPromiseNear :: (Serializable a, Serializable b) => Promise b -> a -> TaskM (Promise a)
- toPromiseImm :: Serializable a => a -> TaskM (Promise a)
- readPromise :: Serializable a => Promise a -> TaskM a
- tlogS :: LogSphere -> LogLevel -> String -> TaskM ()
- tsay :: String -> TaskM ()
- data TaskException = TaskException String
- data MapReduce rawinput input middle1 middle2 result = MapReduce {}
- mapReduce :: (Serializable i, Serializable k, Serializable m, Serializable r) => MapReduce ri i k m r -> ri -> TaskM [r]
- chunkify :: Int -> [a] -> [[a]]
- shuffle :: Ord a => [(a, b)] -> [(a, [b])]
The process layer
remoteInit :: Maybe FilePath -> [RemoteCallMetaData] -> (String -> ProcessM ()) -> IO ()Source
This is the usual way create a single node of distributed program.
The intent is that remoteInit
be called in your program's Main.main
function. A typical call takes this form:
main = remoteInit (Just "config") [Main.__remoteCallMetaData] initialProcess
This will:
- Read the configuration file
config
in the current directory or, if specified, from the file whose path is given by the environment variableRH_CONFIG
. If the given file does not exist or is invalid, an exception will be thrown. - Use the configuration given in the file as well as on the command-line to create a new node. The usual system processes will be started, including logging, discovery, and spawning.
- Compile-time metadata, generated by
Remote.Call.remotable
, will used for invoking closures. Metadata from each module must be explicitly mentioned. - The function initialProcess will be called, given as a parameter a string indicating the value of the cfgRole setting of this node. initialProcess is provided by the user and provides an entrypoint for controlling node behavior on startup.
The monad ProcessM is the core of the process layer. Functions
in the ProcessM monad may participate in messaging and create
additional concurrent processes. You can create
a ProcessM context from an IO
context with the remoteInit
function.
Identifies a node somewhere on the network. These
can be queried from getPeers
. See also getSelfNode
Identifies a process somewhere on the network. These
are produced by the spawn
family of functions and
consumed by send
. When a process ends, its process ID
ceases to be valid. See also getSelfPid
getSelfPid :: ProcessM ProcessIdSource
Returns the process ID of the current process.
getSelfNode :: ProcessM NodeIdSource
Returns the node ID of the node that the current process is running on.
send :: Serializable a => ProcessId -> a -> ProcessM ()Source
Sends a message to the given process. If the
process isn't running or can't be accessed,
this function will throw a TransmitException
.
The message must implement the Serializable
interface.
sendQuiet :: Serializable a => ProcessId -> a -> ProcessM TransmitStatusSource
Like send
, but in case of error returns a value rather than throw
an exception.
spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource
Start a process running the code, given as a closure, on the specified node.
If successful, returns the process ID of the new process. If unsuccessful,
throw a TransmitException
.
spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessIdSource
A variant of spawn
that allows greater control over how the remote process is started.
spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource
A variant of spawn
that starts the remote process with
bidirectoinal monitoring, as in linkProcess
callRemote :: Serializable a => NodeId -> Closure (ProcessM a) -> ProcessM aSource
Invokes a function on a remote node. The function must be given by a closure. This function will block until the called function completes or the connection is broken.
callRemotePure :: Serializable a => NodeId -> Closure a -> ProcessM aSource
callRemoteIO :: Serializable a => NodeId -> Closure (IO a) -> ProcessM aSource
data AmSpawnOptions Source
Constructors
AmSpawnOptions | |
Fields
|
Instances
expect :: Serializable a => ProcessM aSource
A simple way to receive messages.
This will return the first message received
of the specified type; if no such message
is available, the function will block.
Unlike the receive
family of functions,
this function does not allow the notion
of choice in message extraction.
receive :: [MatchM q ()] -> ProcessM (Maybe q)Source
Examines the message queue of the current process, matching each message against each of the
provided message pattern clauses (typically provided by a function from the match
family). If
a message matches, the corresponding handler is invoked and its result is returned. If no
message matches, Nothing is returned.
receiveWait :: [MatchM q ()] -> ProcessM qSource
Examines the message queue of the current process, matching each message against each of the
provided message pattern clauses (typically provided by a function from the match
family). If
a message matches, the corresponding handler is invoked and its result is returned. If no
message matches, the function blocks until a matching message is received.
receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)Source
Examines the message queue of the current process, matching each message against each of the
provided message pattern clauses (typically provided by a function from the match
family). If
a message matches, the corresponding handler is invoked and its result is returned. If no
message matches, the function blocks until a matching message is received, or until the
specified time in microseconds has elapsed, at which point it will return Nothing.
If the specified time is 0, this function is equivalent to receive
.
match :: Serializable a => (a -> ProcessM q) -> MatchM q ()Source
Used to specify a message pattern in receiveWait
and related functions.
Only messages containing data of type a, where a is the argument to the user-provided
function in the first parameter of match
, will be removed from the queue, at which point
the user-provided function will be invoked.
matchUnknown :: ProcessM q -> MatchM q ()Source
A catch-all variant of match
that invokes user-provided code and
will extact any message from the queue. This is useful for matching
against messages that are not recognized. Since message matching patterns
are evaluated in order, this function, if used, should be the last element
in the list of matchers given to receiveWait
and similar functions.
matchUnknownThrow :: MatchM q ()Source
A variant of matchUnknown
that throws a UnknownMessageException
if the process receives a message that isn't extracted by another message matcher.
Equivalent to:
matchUnknown (throw (UnknownMessageException "..."))
matchProcessDown :: ProcessId -> ProcessM q -> MatchM q ()Source
A specialized version of match
(for use with receive
, receiveWait
and friends) for catching process down
messages. This way processes can avoid waiting forever for a response from another process that has crashed.
Intended to be used within a withMonitor
block, e.g.:
withMonitor apid $ do send apid QueryMsg receiveWait [ match (\AnswerMsg -> return "ok"), matchProcessDown apid (return "aborted") ]
logS :: LogSphere -> LogLevel -> String -> ProcessM ()Source
Generates a log entry, using the process's current logging configuration.
-
LogSphere
indicates the subsystem generating this message. SYS in the case of componentes of the framework. -
LogLevel
indicates the importance of the message. - The third parameter is the log message.
Both of the first two parameters may be used to filter log output.
say :: String -> ProcessM ()Source
Uses the logging facility to produce non-filterable, programmatic output. Shouldn't be used for informational logging, but rather for application-level output.
Specifies the subsystem or region that is responsible for
generating a given log entry. This is useful in conjunction
with LogFilter
to limit displayed log output to the
particular area of your program that you are currently debugging.
The SYS, TSK, and SAY spheres are used by the framework
for messages relating to the Process layer, the Task layer,
and the say
function.
The remainder of values are free for use at the application level.
A preference as to what is done with log messages
Specifies which log messages will be output. All log messages of importance below the current log level or not among the criterea given here will be suppressed. This type lets you limit displayed log messages to certain components.
Expresses a current configuration of the logging
subsystem, which determines which log messages to
be output and where to send them when they are.
Both processes and nodes have log configurations,
set with setLogConfig
and setNodeLogConfig
respectively. The node log configuration is
used for all processes that have not explicitly
set their log configuration. Otherwise, the
process log configuration takes priority.
Constructors
LogConfig | |
Specifies the importance of a particular log entry. Can also be used to filter log output.
Constructors
LoSay | Non-suppressible application-level emission |
LoFatal | |
LoCritical | |
LoImportant | |
LoStandard | The default log level |
LoInformation | |
LoTrivial |
setLogConfig :: LogConfig -> ProcessM ()Source
Set the process's log configuration. This overrides any node-level log configuration
setNodeLogConfig :: LogConfig -> ProcessM ()Source
Sets the node's log configuration
getLogConfig :: ProcessM LogConfigSource
Gets the currently active log configuration for the current process; if the current process doesn't have a log configuration set, the process's log configuration will be returned
defaultLogConfig :: LogConfigSource
The default log configuration represents a starting point for setting your own configuration. It is:
logLevel = LoStandard logTarget = LtStdout logFilter = LfAll
getCfgArgs :: ProcessM [String]Source
Returns command-line arguments provided to the executable, excluding any command line arguments that were processed by the framework.
data UnknownMessageException Source
Thrown by matchUnknownThrow
in response to a message
of a wrong type being received by a process
Constructors
UnknownMessageException String |
data ServiceException Source
Thrown by Remote.Process system services in response to some problem
Constructors
ServiceException String |
data TransmitException Source
Thrown by various network-related functions when communication with a host has failed
Constructors
TransmitException TransmitStatus |
data TransmitStatus Source
nameSet :: String -> ProcessM ()Source
Assigns a name to the current process. The name is local to the
node. On each node, each process may have only one name, and each
name may be given to only one node. If this function is called
more than once by the same process, or called more than once
with the name on a single node, it will throw a ServiceException
.
The PID of a named process can be queried later with nameQuery
. When the
named process ends, its name will again become available.
One reason to use named processes is to create node-local state.
This example lets each node have its own favorite color, which can
be changed and queried.
nodeFavoriteColor :: ProcessM () nodeFavoriteColor = do nameSet "favorite_color" loop Blue where loop color = receiveWait [ match (\newcolor -> return newcolor), match (\pid -> send pid color >> return color) ] >>= loop setFavoriteColor :: NodeId -> Color -> ProcessM () setFavoriteColor nid color = do (Just pid) <- nameQuery nid "favorite_color" send pid color getFavoriteColor :: NodeId -> ProcessM Color getFavoriteColor nid = do (Just pid) <- nameQuery nid "favorite_color" mypid <- getSelfPid send pid mypid expect
nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId)Source
Query the PID of a named process on a particular node. If no process of that name exists, or if that process has ended, this function returns Nothing.
nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessIdSource
Similar to nameQuery
but if the named process doesn't exist,
it will be started from the given closure. If the process is
already running, the closure will be ignored.
linkProcess :: ProcessId -> ProcessM ()Source
Establishes bidirectional abnormal termination monitoring between the current
process and another. Monitoring established with linkProcess
is bidirectional and signals only in the event of abnormal termination.
In other words, linkProcess a
is equivalent to:
monitorProcess mypid a MaLinkError monitorProcess a mypid MaLinkError
monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source
Establishes unidirectional processing of another process. The format is:
monitorProcess monitor monitee action
Here,
- monitor is the process that will be notified if the monitee goes down
- monitee is the process that will be monitored
- action determines how the monitor will be notified
Monitoring will remain in place until one of the processes ends or until
unmonitorProcess
is called. Calls to monitorProcess
are cumulative,
such that calling monitorProcess
3 three times on the same pair of processes
will ensure that monitoring will stay in place until unmonitorProcess
is called
three times on the same pair of processes.
If the monitee is not currently running, the monitor will be signalled immediately.
See also MonitorAction
.
unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source
Removes monitoring established by monitorProcess
. Note that the type of
monitoring, given in the third parameter, must match in order for monitoring
to be removed. If monitoring has not already been established between these
two processes, this function takes not action.
withMonitor :: ProcessId -> ProcessM a -> ProcessM aSource
Establishes temporary monitoring of another process. The process to be monitored is given in the
first parameter, and the code to run in the second. If the given process goes down while the code
in the second parameter is running, a process down message will be sent to the current process,
which can be handled by matchProcessDown
.
data MonitorAction Source
The different kinds of monitoring available between processes.
Constructors
MaMonitor | MaMonitor means that the monitor process will be sent a ProcessMonitorException message when the monitee terminates for any reason. |
MaLink | MaLink means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates for any reason |
MaLinkError | MaLinkError means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates abnormally |
data ProcessMonitorException Source
The main form of notification to a monitoring process that a monitored process has terminated.
This data structure can be delivered to the monitor either as a message (if the monitor is
of type MaMonitor
) or as an asynchronous exception (if the monitor is of type MaLink
or MaLinkError
).
It contains the PID of the monitored process and the reason for its nofication.
Constructors
ProcessMonitorException ProcessId SignalReason |
getPeers :: ProcessM PeerInfoSource
Returns information about all nodes on the current network
that this node knows about. This function combines dynamic
and static mechanisms. See documentation on getPeersStatic
and getPeersDynamic
for more info. This function depends
on the configuration values cfgKnownHosts
and cfgPeerDiscoveryPort
.
findPeerByRole :: PeerInfo -> String -> [NodeId]Source
Given a PeerInfo returned by getPeersDynamic or getPeersStatic, give a list of nodes registered as a particular role. If no nodes of that role are found, the empty list is returned.
type PeerInfo = Map String [NodeId]Source
Created by Remote.Peer.getPeers
, this maps
each role to a list of nodes that have that role.
It can be examined directly or queried with
findPeerByRole
.
remotable :: [Name] -> Q [Dec]Source
A compile-time macro to provide easy invocation of closures. To use this, follow the following steps:
- First, enable Template Haskell in the module:
{-# LANGUAGE TemplateHaskell #-} module Main where import Remote.Call (remotable) ...
- Define your functions normally. Restrictions: function's type signature must be explicitly declared; no polymorphism; all parameters must implement Serializable; return value must be pure, or in one of the
ProcessM
,TaskM
, orIO
monads; probably other restrictions as well.
greet :: String -> ProcessM () greet name = say ("Hello, "++name) badFib :: Integer -> Integer badFib 0 = 1 badFib 1 = 1 badFib n = badFib (n-1) + badFib (n-2)
- Use the
remotable
function to automagically generate stubs and closure generators for your functions:
$( remotable ['greet, 'badFib] )
remotable
may be used only once per module.
- When you call
remoteInit
(usually the first thing in your program), be sure to give it the automagically generated function lookup tables from all modules that useremotable
:
main = remoteInit (Just "config") [Main.__remoteCallMetaData, OtherModule.__remoteCallMetaData] initialProcess
- Now you can invoke your functions remotely. When a function expects a closure, give it the name
of the generated closure, rather than the name of the original function. If the function takes parameters,
so will the closure. To start the
greet
function onsomeNode
:
spawn someNode (greet__closure "John Baptist")
Note that we say greet__closure
rather than just greet
. If you prefer, you can use mkClosure
instead, i.e. $(mkClosure 'greet)
, which will expand to greet__closure
. To calculate a Fibonacci number remotely:
val <- callRemotePure someNode (badFib__closure 5)
type RemoteCallMetaData = Lookup -> LookupSource
Data of this type is generated at compile-time
by remotable
and can be used with registerCalls
and remoteInit
to create a metadata lookup table, Lookup
.
The name __remoteCallMetaData
will be present
in any module that uses remotable
.
A data type representing a closure, that is, a function with its environment. In spirit, this is actually:
data Closure a where Closure :: Serializable v => Static (v -> a) -> v -> Closure a
where the Static type wraps a function with no non-static free variables. We simulate this behavior by identifying top-level functions as strings. See the paper for clarification.
makeClosure :: (Typeable a, Serializable v) => String -> v -> ProcessM (Closure a)Source
genericPut :: Data a => a -> PutSource
Data types that can be used in messaging must
be serializable, which means that they must implement
the get
and put
methods from Binary
. If you
are too lazy to write these functions yourself,
you can delegate responsibility to this function.
It's usually sufficient to do something like this:
import Data.Data (Data) import Data.Typeable (Typeable) import Data.Binary (Binary, get, put) data MyType = MkMyType Foobar Int [(String, Waddle Baz)] | MkSpatula deriving (Data, Typeable) instance Binary MyType where put = genericPut get = genericGet
genericGet :: Data a => Get aSource
This is the counterpart genericPut
class (Binary a, Typeable a) => Serializable a Source
Data that can be sent as a message must implement
this class. The class has no functions of its own,
but instead simply requires that the type implement
both Typeable
and Binary
. Typeable can usually
be derived automatically. Binary requires the put and get
functions, which can be easily implemented by hand,
or you can use the genericGet
and genericPut
flavors,
which will work automatically for types implementing
Data
.
Instances
(Binary a, Typeable a) => Serializable a |
Channels
A channel is a unidirectional communication pipeline
with two ends: a sending port, and a receiving port.
This is the sending port. A process holding this
value can insert messages into the channel. SendPorts
themselves can also be sent to other processes.
The other side of the channel is the ReceivePort
.
data ReceivePort a Source
A process holding a ReceivePort can extract messages
from the channel, which we inserted by
the holder(s) of the corresponding SendPort
.
Critically, ReceivePorts, unlike SendPorts, are not serializable.
This means that you can only receive messages through a channel
on the node on which the channel was created.
newChannel :: Serializable a => ProcessM (SendPort a, ReceivePort a)Source
Create a new channel, and returns both the SendPort
and ReceivePort
thereof.
sendChannel :: Serializable a => SendPort a -> a -> ProcessM ()Source
Inserts a new value into the channel.
receiveChannel :: Serializable a => ReceivePort a -> ProcessM aSource
Extract a value from the channel, in FIFO order.
data CombinedChannelAction b Source
combinedChannelAction :: Serializable a => ReceivePort a -> (a -> b) -> CombinedChannelAction bSource
Specifies a port and an adapter for combining ports via combinePortsBiased
and
combinePortsRR
.
combinePortsBiased :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)Source
This function lets us respond to messages on multiple channels
by combining several ReceivePort
s into one. The resulting port
is the sum of the input ports, and will extract messages from all
of them in FIFO order. The input ports are specified by
combinedChannelAction
, which also gives a converter function.
After combining the underlying receive ports can still
be used independently, as well.
We provide two ways to combine ports, which differ bias
they demonstrate in returning messages when more than one
underlying channel is nonempty. combinePortsBiased will
check ports in the order given by its argument, and so
if the first channel always was a message waiting, it will.
starve the other channels. The alternative is combinePortsRR
.
combinePortsRR :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)Source
See combinePortsBiased
. This function differs from that one
in that the order that the underlying ports are checked is rotated
with each invocation, guaranteeing that, given enough invocations,
every channel will have a chance to contribute a message.
mergePortsBiased :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)Source
Similar to combinePortsBiased
, with the difference that the
the underlying ports must be of the same type, and you don't
have the opportunity to provide an adapter function.
mergePortsRR :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)Source
Similar to combinePortsRR
, with the difference that the
the underlying ports must be of the same type, and you don't
have the opportunity to provide an adapter function.
terminateChannel :: Serializable a => ReceivePort a -> ProcessM ()Source
Terminate a channel. After calling this function, receiveChannel
on that port (or on any combined port based on it) will either
fail or block indefinitely, and sendChannel
on the corresponding
SendPort
will fail. Any unread messages remaining in the channel
will be lost.
The task layer
runTask :: TaskM a -> ProcessM aSource
Starts a new context for executing a TaskM
environment.
The node on which this function is run becomes a new master
in a Task application; as a result, the application should
only call this function once. The master will attempt to
control all nodes that it can find; if you are going to be
running more than one CH application on a single network,
be sure to give each application a different network
magic (via cfgNetworkMagic). The master TaskM environment
created by this function can then spawn other threads,
locally or remotely, using newPromise
and friends.
The basic data type for expressing data dependence
in the TaskM
monad. A Promise represents a value that
may or may not have been computed yet; thus, it's like
a distributed thunk (in the sense of a non-strict unit
of evaluation). These are created by newPromise
and friends,
and the underlying value can be gotten with readPromise
.
Instances
Typeable1 Promise | |
Serializable a => Binary (Promise a) |
newPromise :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)Source
Given a function (expressed here as a closure, see Remote.Call)
that computes a value, returns a token identifying that value.
This token, a Promise
can be moved about even if the
value hasn't been computed yet. The computing function
will be started somewhere among the nodes visible to the
current master, preferring those nodes that correspond
to the defaultLocality
. Afterwards, attempts to
redeem the promise with readPromise
will contact the node
where the function is executing.
newPromiseHere :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)Source
A variant of newPromise
that prefers to start
the computing function on the same node as the caller.
Useful if you plan to use the resulting value
locally.
newPromiseAtRole :: Serializable a => String -> Closure (TaskM a) -> TaskM (Promise a)Source
A variant of newPromise
that prefers to start
the computing functions on some set of nodes that
have a given role (assigned by the cfgRole configuration
option).
newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a)Source
A variant of newPromise
that prefers to start
the computing function on the same node where some
other promise lives. The other promise is not
evaluated.
toPromise :: Serializable a => a -> TaskM (Promise a)Source
Like newPromise
, but creates a promise whose
values is already known. In other words, it puts
a given, already-calculated value in a promise.
Conceptually (but not syntactically, due to closures),
you can consider it like this:
toPromise a = newPromise (return a)
toPromiseNear :: (Serializable a, Serializable b) => Promise b -> a -> TaskM (Promise a)Source
Similar to toPromiseAt
and newPromiseNear
toPromiseImm :: Serializable a => a -> TaskM (Promise a)Source
Creates an immediate promise, which is to say, a promise
in name only. Unlike a regular promise (created by toPromise
),
this kind of promise contains the value directly. The
advantage is that promise redemption is very fast, requiring
no network communication. The downside is that it the
underlying data will be copied along with the promise.
Useful only for small data.
readPromise :: Serializable a => Promise a -> TaskM aSource
Given a promise, gets the value that is being
calculated. If the calculation has finished,
the owning node will be contacted and the data
moved to the current node. If the calculation
has not finished, this function will block
until it has. If the calculation failed
by throwing an exception (e.g. divide by zero),
then this function will throw an excption as well
(a TaskException
). If the node owning the
promise is not accessible, the calculation
will be restarted.
tlogS :: LogSphere -> LogLevel -> String -> TaskM ()Source
Writes various kinds of messages to the Remote.Process log.
data TaskException Source
Constructors
TaskException String |
data MapReduce rawinput input middle1 middle2 result Source
A data structure that stores the important
user-provided functions that are the namesakes
of the MapReduce algorithm.
The number of mapper processes can be controlled
by the user by controlling the length of the string
returned by mtChunkify. The number of reducer
promises is controlled by the number of values
values returned by shuffler.
The user must provide their own mapper and reducer.
For many cases, the default chunkifier (chunkify
)
and shuffler (shuffle
) are adequate.
mapReduce :: (Serializable i, Serializable k, Serializable m, Serializable r) => MapReduce ri i k m r -> ri -> TaskM [r]Source
The MapReduce algorithm, implemented in a very simple form on top of the Task layer. Its use depends on four user-determined data types:
- input -- The data type provided as the input to the algorithm as a whole and given to the mapper.
- middle1 -- The output of the mapper. This may include some key which is used by the shuffler to allocate data to reducers.
If you use the default shuffler,
shuffle
, this type must have the formOrd a => (a,b)
. - middle2 -- The output of the shuffler. The default shuffler emits a type in the form
Ord => (a,[b])
. Each middle2 output by shuffler is given to a separate reducer. - result -- The output of the reducer, upon being given a bunch of middles.
chunkify :: Int -> [a] -> [[a]]Source
A convenient way to provide the mtChunkify
function
as part of mapReduce
.