MapReduce & ForkJoin
Execute MapReduce and ForkJoin tasks in memory.
This is a legacy Apache Ignite documentation
The new documentation is hosted here: https://ignite.apache.org/docs/latest/
Overview
ComputeTask
is the Ignite abstraction for the simplified in-memory MapReduce, which is also very close to the ForkJoin paradigm. Pure MapReduce was never built for performance and only works well when dealing with offline batch oriented processing (e.g. Hadoop MapReduce). However, when computing on data that resides in-memory, real-time low latencies and high throughput usually take the highest priority. Also, simplicity of the API becomes very important as well. With that in mind, Ignite introduced the ComputeTask
API, which is a light-weight MapReduce (or ForkJoin) implementation.
Use
ComputeTask
only when you need fine-grained control over the job-to-node mapping, or custom fail-over logic. For all other cases you should use simple closure executions on the cluster documented in the Distributed Closures section.
ComputeTask
ComputeTask
defines jobs to execute on the cluster, and the mappings of those jobs to nodes. It also defines how to process (reduce) the job results. All IgniteCompute.execute(...)
methods execute the given task on the grid. User applications should implement the map(...)
and reduce(...)
methods from the ComputeTask interface.
Tasks are defined by implementing 2 (or 3) methods of the ComputeTask
interface.
Map Method
map(...)
instantiates the jobs and maps them to worker nodes. The method receives the collection of cluster nodes on which the task is run and the task argument. The method returns a map with jobs as keys and mapped worker nodes as values. The jobs are then sent to the mapped nodes and executed there.
Refer to ComputeTaskSplitAdapter for a simplified implementation of the
map(...)
method.
Result Method
result(...)
is called each time a job completes on some cluster node. It receives the result returned by the completed job, as well as the list of all the job results received so far. The method should return a ComputeJobResultPolicy
instance, indicating what to do next:
WAIT
- wait for all remaining jobs to complete (if any)REDUCE
- immediately move to the reduce step, discarding all the remaining jobs and yet unreceived resultsFAILOVER
- failover the job to another node (see Fault Tolerance)
All the received job results will be available in thereduce(...)
method as well.
Reduce Method
reduce(...)
is called during the reduce step, when all the jobs have completed (or REDUCE result policy was returned from the result(...)
method). The method receives a list with all the completed results and returns a final result of the computation.
Compute Task Adapters
It is not necessary to implement all 3 methods of the ComputeTask
API each time you need to define a computation. There is a number of helper classes that let you describe only a particular piece of your logic, leaving out all the rest to Ignite to handle automatically.
ComputeTaskAdapter
ComputeTaskAdapter
defines a default implementation of the result(...)
method which returns FAILOVER
policy if a job threw an exception and WAIT
policy otherwise, thus waiting for all jobs to finish with a result.
ComputeTaskSplitAdapter
ComputeTaskSplitAdapter
extends ComputeTaskAdapter
and adds the capability to automatically assign jobs to nodes. It hides the map(...)
method and adds a new split(...)
method which requires a collection of the jobs to be executed (the mapping of those jobs to nodes will be handled automatically by the adapter in a load-balanced fashion).
This adapter is especially useful in homogeneous environments where all nodes are equally suitable for executing jobs and the mapping step can be done implicitly.
ComputeJob
All jobs that are spawned by a task are implementations of the ComputeJob
interface. The execute()
method of this interface defines the job logic and returns a job result. The cancel()
method defines the logic in the case where the job is discarded (for example, when a task decides to reduce immediately or to cancel).
ComputeJobAdapter
Convenience adapter which provides a no-op implementation of the cancel()
method.
Example
Here is an example of ComputeTask
and ComputeJob
implementations.
IgniteCompute compute = ignite.compute();
// Execute task on the cluster and wait for its completion.
int cnt = compute.execute(CharacterCountTask.class, "Hello Grid Enabled World!");
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
/**
* Task to count non-white-space characters in a phrase.
*/
private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> {
// 1. Splits the received string into to words
// 2. Creates a child job for each word
// 3. Sends created jobs to other nodes for processing.
@Override
public List<ComputeJob> split(int gridSize, String arg) {
String[] words = arg.split(" ");
List<ComputeJob> jobs = new ArrayList<>(words.length);
for (final String word : arg.split(" ")) {
jobs.add(new ComputeJobAdapter() {
@Override public Object execute() {
System.out.println(">>> Printing '" + word + "' on from compute job.");
// Return number of letters in the word.
return word.length();
}
});
}
return jobs;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
IgniteCompute compute = ignite.compute();
// Execute task on the clustr and wait for its completion.
int cnt = grid.compute().execute(CharacterCountTask.class, "Hello Grid Enabled World!");
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
/**
* Task to count non-white-space characters in a phrase.
*/
private static class CharacterCountTask extends ComputeTaskAdapter<String, Integer> {
// 1. Splits the received string into to words
// 2. Creates a child job for each word
// 3. Sends created jobs to other nodes for processing.
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
String[] words = arg.split(" ");
Map<ComputeJob, ClusterNode> map = new HashMap<>(words.length);
Iterator<ClusterNode> it = subgrid.iterator();
for (final String word : arg.split(" ")) {
// If we used all nodes, restart the iterator.
if (!it.hasNext())
it = subgrid.iterator();
ClusterNode node = it.next();
map.put(new ComputeJobAdapter() {
@Override public Object execute() {
System.out.println(">>> Printing '" + word + "' on this node from grid job.");
// Return number of letters in the word.
return word.length();
}
}, node);
}
return map;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
Distributed Task Session
A distributed task session is created for every task execution. It is defined by the ComputeTaskSession
interface. Task session is visible to the task and all the jobs spawned by it, so attributes set on a task or on a job can be accessed on other jobs. Task session can also receive notifications when attributes are set or wait for an attribute to be set.
The sequence in which session attributes are set is consistent across the task and all job siblings within it. There will never be a case when one job sees attribute A before attribute B, and another job sees attribute B before A.
In the example below, we have all jobs synchronize on STEP1 before moving on to STEP2.
@ComputeTaskSessionFullSupport annotation
Note that distributed task session attributes are disabled by default for performance reasons. To enable them attach
@ComputeTaskSessionFullSupport
annotation to the task class.
IgniteCompute compute = ignite.commpute();
compute.execute(new TaskSessionAttributesTask(), null);
/**
* Task demonstrating distributed task session attributes.
* Note that task session attributes are enabled only if
* @ComputeTaskSessionFullSupport annotation is attached.
*/
@ComputeTaskSessionFullSupport
private static class TaskSessionAttributesTask extends ComputeTaskSplitAdapter<Object, Object>() {
@Override
protected Collection<? extends GridJob> split(int gridSize, Object arg) {
Collection<ComputeJob> jobs = new LinkedList<>();
// Generate jobs by number of nodes in the grid.
for (int i = 0; i < gridSize; i++) {
jobs.add(new ComputeJobAdapter(arg) {
// Auto-injected task session.
@TaskSessionResource
private ComputeTaskSession ses;
// Auto-injected job context.
@JobContextResource
private ComputeJobContext jobCtx;
@Override
public Object execute() {
// Perform STEP1.
...
// Tell other jobs that STEP1 is complete.
ses.setAttribute(jobCtx.getJobId(), "STEP1");
// Wait for other jobs to complete STEP1.
for (ComputeJobSibling sibling : ses.getJobSiblings())
ses.waitForAttribute(sibling.getJobId(), "STEP1", 0);
// Move on to STEP2.
...
}
}
}
}
@Override
public Object reduce(List<ComputeJobResult> results) {
// No-op.
return null;
}
}
Updated 4 months ago