Continuous Mapping

Run MapReduce with a dynamic job set.

❗️

This is a legacy Apache Ignite documentation

The new documentation is hosted here: https://ignite.apache.org/docs/latest/

Overview

In a classic MapReduce paradigm we have a well-defined and finite set of jobs, which is processed on the Map step and doesn't change throughout the entire computation run. But what if we have a stream of jobs instead? In this case we can still run MapReduce using Ignite's continuous mapping facility. With continuous mapping, the jobs can be generated on-the-fly when computation is already running. The newly generated jobs are processed by worker nodes as usual, and the reducer receives results just like in normal MapReduce.

Running Continuously Mapped Tasks

To use continuous mapping within your task, you need to inject TaskContinuousMapperResource resource into a task instance:

@TaskContinuousMapperResource
private TaskContinuousMapper mapper;

After this, new jobs can be generated asynchronously and added to a currently running computation using the send() method from TaskContinuousMapper interface:

mapper.send(new ComputeJobAdapter() {
    @Override public Object execute() {
        System.out.println("I'm a continuously-mapped job!");
 
        return null;
    }
});

For continuous mapping, there are several constraints that you need to be aware of:

  • If you initially return null from ComputeTask.map() method, you should send at least one job with continuous mapper before returning.
  • Continuous mapper can not be used after ComputeTask.result() method returns the REDUCE policy.
  • If the ComputeTask.result() method returns the WAIT policy and all jobs are finished, then the task will go to the Reduce step and continuous mapper can no longer be used.

In other respects, the computation logic is the same as in normal MapReduce, described in the MapReduce chapter.

Example

The following example performs Web site analysis based on the images it contains. The Web search engine scans through all images available on the site, and the MapReduce engine — implemented with Ignite — determines the category ("meaning") of each image based on some image-analysis algorithm. The results of image analysis are then reduced to determine the site category. This example is good for demonstrating continuous mapping because Web search is a continuous process with which MapReduce can run in parallel and analyze new Web search results (new image files) as they come. This can save a significant amount of time compared to traditional MapReduce, where we would first wait for all Web search results (a full set of images) and only then map them to nodes, analyze, and reduce results.

Suppose we have a Crawler interface from some library for scanning site images, CrawlerListener interface for getting results of asynchronous background scanning, and an Image interface representing a single image file (the class names are intentionally made short to simplify reading):

interface Crawler {
    ...
    public Image findNext();
 
    public void findNextAsync(CrawlerListener listener);
 
    ...
}
 
interface CrawlerListener {
    public void onImage(Crawler c, Image img) throws Exception;
}
 
interface Image {
    ...
 
    public byte[] getBytes();
 
    ...
}

Suppose we also have an ImageAnalysisJob that implements ComputeJob with logic for analyzing image:

class ImageAnalysisJob implements ComputeJob, Externalizable {
    ...
 
    public ImageAnalysisJob(byte[] imgBytes) {
        ...
    }
 
    @Nullable @Override public Object execute() throws IgniteException {
        // Image analysis logic (returns some information 
        // about the image content: category, etc.).
        ...
    }
}

Given all the above, here is how we could run Web search and analysis in parallel using continuous mapping:

enum SiteCategory {
    ...
}
 
// Instantiate a Web search engine for a particular site.
Crawler crawler = CrawlerFactory.newCrawler(siteUrl);
 
// Execute a continuously-mapped task.
SiteCategory result = ignite.compute().execute(new ComputeTaskAdapter<Crawler, SiteCategory>() {
    // Interface for continuous mapping (injected on task instantiation).
    @TaskContinuousMapperResource
    private TaskContinuousMapper mapper;
 
    // Map step.
    @Nullable @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, @Nullable Crawler c) throws IgniteException {
        assert c != null;
 
        // Find a first image synchronously to submit an initial job.
        Image img = c.findNext();
 
        if (img == null)
            throw new IgniteException("No images found on the site.");
 
        // Submit an initial job.
        mapper.send(new ImageAnalysisJob(img.getBytes()));
 
        // Now start asynchronous background Web search and
        // submit new jobs as search results come. This call
        // will return immediately.
        c.findNextAsync(new CrawlerListener() {
            @Override public void onImage(Crawler c, Image img) throws Exception {
                if (img != null) {
                    // Submit a new job to analyse image file.
                    mapper.send(new ImageAnalysisJob(img.getBytes()));
 
                    // Move on with search.
                    c.findNextAsync(this);
                }
            }
        });
 
        // Initial job was submitted, so we can return 
        // empty mapping.
        return null;
    }
 
    // Reduce step.
    @Nullable @Override public SiteCategory reduce(List<ComputeJobResult> results) throws IgniteException {
        // At this point Web search is finished and all image 
        // files are analysed. Here we execute some logic for
        // determining site category based on image content
        // information.
        return defineSiteCategory(results);
    }
}, crawler);

In the example above, for simplicity, we assume that the image analysis job takes much longer than searching for the next image on the site. In other words, new Web search results arrive faster than image files are analyzed. In real life, however, that's not necessarily true, and we can easily get to a situation when we prematurely finish the analysis, because all current jobs have finished and new search results haven't arrived yet (due to network latency or whatever). In this case, Ignite will switch to Reduce step, hence continuous mapping is no longer possible.

To avoid such cases, you may consider submitting a special job, which finishes only upon receiving some message, or uses one of the available distributed synchronization primitives like CountDownLatch. This way you can ensure the Reduce step doesn't start before Web search is finished.