Skip to content

Commit 463535d

Browse files
author
Ahmed Eldawy
committed
Merged all spatial join algorithms into one
1 parent 18b2495 commit 463535d

File tree

1 file changed

+394
-0
lines changed

1 file changed

+394
-0
lines changed
Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
package edu.umn.cs.spatialHadoop.operations;
2+
3+
import java.io.IOException;
4+
import java.util.HashSet;
5+
import java.util.Set;
6+
7+
import org.apache.commons.logging.Log;
8+
import org.apache.commons.logging.LogFactory;
9+
import org.apache.hadoop.conf.Configuration;
10+
import org.apache.hadoop.fs.BlockLocation;
11+
import org.apache.hadoop.fs.FileStatus;
12+
import org.apache.hadoop.fs.FileSystem;
13+
import org.apache.hadoop.fs.Path;
14+
import org.apache.hadoop.io.ArrayWritable;
15+
import org.apache.hadoop.io.DoubleWritable;
16+
import org.apache.hadoop.io.Text;
17+
import org.apache.hadoop.io.Writable;
18+
import org.apache.hadoop.mapred.ClusterStatus;
19+
import org.apache.hadoop.mapred.Counters;
20+
import org.apache.hadoop.mapred.Counters.Counter;
21+
import org.apache.hadoop.mapred.FileSplit;
22+
import org.apache.hadoop.mapred.InputSplit;
23+
import org.apache.hadoop.mapred.JobClient;
24+
import org.apache.hadoop.mapred.JobConf;
25+
import org.apache.hadoop.mapred.JobInProgress;
26+
import org.apache.hadoop.mapred.MapReduceBase;
27+
import org.apache.hadoop.mapred.Mapper;
28+
import org.apache.hadoop.mapred.OutputCollector;
29+
import org.apache.hadoop.mapred.RecordReader;
30+
import org.apache.hadoop.mapred.Reporter;
31+
import org.apache.hadoop.mapred.RunningJob;
32+
import org.apache.hadoop.mapred.Task;
33+
import org.apache.hadoop.mapred.TextOutputFormat;
34+
import org.apache.hadoop.mapred.lib.CombineFileSplit;
35+
import org.apache.hadoop.mapred.spatial.BinaryRecordReader;
36+
import org.apache.hadoop.mapred.spatial.BinarySpatialInputFormat;
37+
import org.apache.hadoop.mapred.spatial.BlockFilter;
38+
import org.apache.hadoop.mapred.spatial.DefaultBlockFilter;
39+
import org.apache.hadoop.mapred.spatial.PairWritable;
40+
import org.apache.hadoop.mapred.spatial.ShapeArrayRecordReader;
41+
import org.apache.hadoop.spatial.CellInfo;
42+
import org.apache.hadoop.spatial.Rectangle;
43+
import org.apache.hadoop.spatial.ResultCollector2;
44+
import org.apache.hadoop.spatial.Shape;
45+
import org.apache.hadoop.spatial.SimpleSpatialIndex;
46+
import org.apache.hadoop.spatial.SpatialAlgorithms;
47+
import org.apache.hadoop.spatial.SpatialSite;
48+
import org.apache.hadoop.util.IndexedSortable;
49+
import org.apache.hadoop.util.LineReader;
50+
import org.apache.hadoop.util.QuickSort;
51+
52+
import edu.umn.cs.CommandLineArguments;
53+
54+
/**
55+
* Performs a spatial join between two or more files using the redistribute-join
56+
* algorithm.
57+
* @author eldawy
58+
*
59+
*/
60+
public class DistributedJoin {
61+
private static final Log LOG = LogFactory.getLog(DistributedJoin.class);
62+
63+
public static class SpatialJoinFilter extends DefaultBlockFilter {
64+
@Override
65+
public void selectBlockPairs(SimpleSpatialIndex<BlockLocation> gIndex1,
66+
SimpleSpatialIndex<BlockLocation> gIndex2,
67+
ResultCollector2<BlockLocation, BlockLocation> output) {
68+
// Do a spatial join between the two global indexes
69+
SimpleSpatialIndex.spatialJoin(gIndex1, gIndex2, output);
70+
}
71+
}
72+
73+
public static class RedistributeJoinMap extends MapReduceBase
74+
implements Mapper<PairWritable<CellInfo>, PairWritable<? extends Writable>, Shape, Shape> {
75+
public void map(
76+
final PairWritable<CellInfo> key,
77+
final PairWritable<? extends Writable> value,
78+
final OutputCollector<Shape, Shape> output,
79+
Reporter reporter) throws IOException {
80+
final Rectangle mapperMBR = key.first.getIntersection(key.second);
81+
82+
// Join two arrays
83+
ArrayWritable ar1 = (ArrayWritable) value.first;
84+
ArrayWritable ar2 = (ArrayWritable) value.second;
85+
SpatialAlgorithms.SpatialJoin_planeSweep(
86+
(Shape[])ar1.get(), (Shape[])ar2.get(),
87+
new ResultCollector2<Shape, Shape>() {
88+
@Override
89+
public void collect(Shape x, Shape y) {
90+
Rectangle intersectionMBR = x.getMBR().getIntersection(y.getMBR());
91+
// Employ reference point duplicate avoidance technique
92+
if (mapperMBR.contains(intersectionMBR.x, intersectionMBR.y)) {
93+
try {
94+
output.collect(x, y);
95+
} catch (IOException e) {
96+
e.printStackTrace();
97+
}
98+
}
99+
}
100+
}
101+
);
102+
}
103+
}
104+
105+
/**
106+
* Reads a pair of arrays of shapes
107+
* @author eldawy
108+
*
109+
*/
110+
public static class DJRecordReaderArray extends BinaryRecordReader<CellInfo, ArrayWritable> {
111+
public DJRecordReaderArray(Configuration conf, CombineFileSplit fileSplits) throws IOException {
112+
super(conf, fileSplits);
113+
}
114+
115+
@Override
116+
protected RecordReader<CellInfo, ArrayWritable> createRecordReader(
117+
Configuration conf, CombineFileSplit split, int i) throws IOException {
118+
FileSplit fsplit = new FileSplit(split.getPath(i),
119+
split.getStartOffsets()[i],
120+
split.getLength(i), split.getLocations());
121+
return new ShapeArrayRecordReader(conf, fsplit);
122+
}
123+
}
124+
125+
/**
126+
* Input format that returns a record reader that reads a pair of arrays of
127+
* shapes
128+
* @author eldawy
129+
*
130+
*/
131+
public static class DJInputFormatArray extends BinarySpatialInputFormat<CellInfo, ArrayWritable> {
132+
133+
@Override
134+
public RecordReader<PairWritable<CellInfo>, PairWritable<ArrayWritable>> getRecordReader(
135+
InputSplit split, JobConf job, Reporter reporter) throws IOException {
136+
return new DJRecordReaderArray(job, (CombineFileSplit)split);
137+
}
138+
}
139+
140+
/**
141+
* Repartition the smaller (first) file to match the partitioning of the
142+
* larger file.
143+
* @param fs
144+
* @param files
145+
* @param stockShape
146+
* @param fStatus
147+
* @param gIndexes
148+
* @throws IOException
149+
*/
150+
@SuppressWarnings("unchecked")
151+
protected static void repartitionStep(FileSystem fs, final Path[] files,
152+
Shape stockShape) throws IOException {
153+
154+
final FileStatus[] fStatus = new FileStatus[files.length];
155+
SimpleSpatialIndex<BlockLocation>[] gIndexes =
156+
new SimpleSpatialIndex[files.length];
157+
158+
for (int i_file = 0; i_file < files.length; i_file++) {
159+
fStatus[i_file] = fs.getFileStatus(files[i_file]);
160+
gIndexes[i_file] = fs.getGlobalIndex(fStatus[i_file]);
161+
}
162+
163+
// Do the repartition step
164+
long t1 = System.currentTimeMillis();
165+
166+
// Get the partitions to use for partitioning the smaller file
167+
168+
// Do a spatial join between partitions of the two files to find
169+
// overlapping partitions.
170+
final Set<CellInfo> cellSet = new HashSet<CellInfo>();
171+
final DoubleWritable matched_area = new DoubleWritable(0);
172+
SimpleSpatialIndex.spatialJoin(gIndexes[0], gIndexes[1], new ResultCollector2<BlockLocation, BlockLocation>() {
173+
@Override
174+
public void collect(BlockLocation x, BlockLocation y) {
175+
// Always use the cell of the larger file
176+
cellSet.add(y.getCellInfo());
177+
Rectangle intersection = x.getCellInfo().getIntersection(y.getCellInfo());
178+
matched_area.set(matched_area.get() +
179+
(double)intersection.width * intersection.height);
180+
}
181+
});
182+
183+
// Estimate output file size of repartition based on the ratio of
184+
// matched area to smaller file area
185+
Rectangle smaller_file_mbr = FileMBR.fileMBRLocal(fs, files[1], stockShape);
186+
long estimatedRepartitionedFileSize = (long) (fStatus[0].getLen() *
187+
matched_area.get() / (smaller_file_mbr.width * smaller_file_mbr.height));
188+
LOG.info("Estimated repartitioned file size: "+estimatedRepartitionedFileSize);
189+
// Choose a good block size for repartitioned file to make every partition
190+
// fits in one block
191+
long blockSize = estimatedRepartitionedFileSize / cellSet.size();
192+
// Adjust blockSize to a multiple of bytes per checksum
193+
int bytes_per_checksum =
194+
new Configuration().getInt("io.bytes.per.checksum", 512);
195+
blockSize = (long) (Math.ceil((double)blockSize / bytes_per_checksum) *
196+
bytes_per_checksum);
197+
LOG.info("Using a block size of "+blockSize);
198+
199+
// Repartition the smaller file
200+
Path partitioned_file;
201+
do {
202+
partitioned_file = new Path("/"+files[0].getName()+
203+
".repartitioned_"+(int)(Math.random() * 1000000));
204+
} while (fs.exists(partitioned_file));
205+
// Repartition the smaller file with no local index
206+
Repartition.repartitionMapReduce(files[0], partitioned_file,
207+
stockShape, blockSize, cellSet.toArray(new CellInfo[cellSet.size()]),
208+
null, true);
209+
long t2 = System.currentTimeMillis();
210+
System.out.println("Repartition time "+(t2-t1)+" millis");
211+
212+
// Continue with the join step
213+
if (fs.exists(partitioned_file)) {
214+
// An output file might not existent if the two files are disjoing
215+
216+
// Replace the smaller file with its repartitioned copy
217+
files[0] = partitioned_file;
218+
// Delete temporary repartitioned file upon exit
219+
fs.deleteOnExit(partitioned_file);
220+
}
221+
}
222+
223+
/**
224+
* Performs a redistribute join between the given files using the redistribute
225+
* join algorithm. Currently, we only support a pair of files.
226+
* @param fs
227+
* @param files
228+
* @param output
229+
* @return
230+
* @throws IOException
231+
*/
232+
public static <S extends Shape> long joinStep(FileSystem fs, Path[] files,
233+
S stockShape, OutputCollector<S, S> output) throws IOException {
234+
long t1 = System.currentTimeMillis();
235+
236+
JobConf job = new JobConf(DistributedJoin.class);
237+
238+
Path outputPath;
239+
FileSystem outFs = files[0].getFileSystem(job);
240+
do {
241+
outputPath = new Path("/"+files[0].getName()+
242+
".dj_"+(int)(Math.random() * 1000000));
243+
} while (outFs.exists(outputPath));
244+
outFs.deleteOnExit(outputPath);
245+
246+
job.setJobName("DistributedJoin");
247+
ClusterStatus clusterStatus = new JobClient(job).getClusterStatus();
248+
job.setMapperClass(RedistributeJoinMap.class);
249+
job.setMapOutputKeyClass(stockShape.getClass());
250+
job.setMapOutputValueClass(stockShape.getClass());
251+
job.setBoolean(SpatialSite.AutoCombineSplits, true);
252+
job.setNumMapTasks(10 * Math.max(1, clusterStatus.getMaxMapTasks()));
253+
job.setNumReduceTasks(0); // No reduce needed for this task
254+
255+
job.setInputFormat(DJInputFormatArray.class);
256+
job.setClass(SpatialSite.FilterClass, SpatialJoinFilter.class, BlockFilter.class);
257+
job.set(SpatialSite.SHAPE_CLASS, stockShape.getClass().getName());
258+
job.setOutputFormat(TextOutputFormat.class);
259+
260+
String commaSeparatedFiles = "";
261+
for (int i = 0; i < files.length; i++) {
262+
if (i > 0)
263+
commaSeparatedFiles += ',';
264+
commaSeparatedFiles += files[i].toUri().toString();
265+
}
266+
DJInputFormatArray.addInputPaths(job, commaSeparatedFiles);
267+
TextOutputFormat.setOutputPath(job, outputPath);
268+
269+
RunningJob runningJob = JobClient.runJob(job);
270+
Counters counters = runningJob.getCounters();
271+
Counter outputRecordCounter = counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS);
272+
final long resultCount = outputRecordCounter.getValue();
273+
274+
// Output number of running map tasks
275+
Counter mapTaskCountCounter = counters
276+
.findCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS);
277+
System.out.println("Number of map tasks "+mapTaskCountCounter.getValue());
278+
279+
S s1 = stockShape;
280+
@SuppressWarnings("unchecked")
281+
S s2 = (S) stockShape.clone();
282+
// Read job result
283+
FileStatus[] results = outFs.listStatus(outputPath);
284+
for (FileStatus fileStatus : results) {
285+
if (fileStatus.getLen() > 0 && fileStatus.getPath().getName().startsWith("part-")) {
286+
if (output != null) {
287+
// Report every single result as a pair of shapes
288+
LineReader lineReader = new LineReader(outFs.open(fileStatus.getPath()));
289+
Text text = new Text();
290+
while (lineReader.readLine(text) > 0) {
291+
String str = text.toString();
292+
String[] parts = str.split("\t", 2);
293+
s1.fromText(new Text(parts[0]));
294+
s2.fromText(new Text(parts[1]));
295+
output.collect(s1, s2);
296+
}
297+
lineReader.close();
298+
}
299+
}
300+
}
301+
302+
outFs.delete(outputPath, true);
303+
long t2 = System.currentTimeMillis();
304+
System.out.println("Join time "+(t2-t1)+" millis");
305+
306+
return resultCount;
307+
}
308+
309+
/**
310+
* Spatially joins two files.
311+
* @param fs
312+
* @param files
313+
* @param stockShape
314+
* @param output
315+
* @return
316+
* @throws IOException
317+
*/
318+
@SuppressWarnings("unchecked")
319+
public static long distributedJoinSmart(FileSystem fs, final Path[] files,
320+
Shape stockShape,
321+
OutputCollector<Shape, Shape> output) throws IOException {
322+
// Decide whether to do a repartition step or not
323+
int cost_with_repartition, cost_without_repartition;
324+
final FileStatus[] fStatus = new FileStatus[files.length];
325+
for (int i_file = 0; i_file < files.length; i_file++) {
326+
fStatus[i_file] = fs.getFileStatus(files[i_file]);
327+
}
328+
329+
// Sort files by length (size)
330+
IndexedSortable filesSortable = new IndexedSortable() {
331+
@Override
332+
public void swap(int i, int j) {
333+
Path tmp1 = files[i];
334+
files[i] = files[j];
335+
files[j] = tmp1;
336+
337+
FileStatus tmp2 = fStatus[i];
338+
fStatus[i] = fStatus[j];
339+
fStatus[j] = tmp2;
340+
}
341+
342+
@Override
343+
public int compare(int i, int j) {
344+
return fStatus[i].getLen() < fStatus[j].getLen() ? -1 : 1;
345+
}
346+
};
347+
348+
new QuickSort().sort(filesSortable, 0, files.length);
349+
SimpleSpatialIndex<BlockLocation>[] gIndexes =
350+
new SimpleSpatialIndex[fStatus.length];
351+
for (int i_file = 0; i_file < fStatus.length; i_file++)
352+
gIndexes[i_file] = fs.getGlobalIndex(fStatus[i_file]);
353+
354+
cost_without_repartition = SimpleSpatialIndex.spatialJoin(gIndexes[0],
355+
gIndexes[1], null);
356+
// Cost of repartition + cost of join
357+
cost_with_repartition = gIndexes[0].size() * 3 + gIndexes[1].size();
358+
LOG.info("Cost with repartition is estimated to "+cost_with_repartition);
359+
LOG.info("Cost without repartition is estimated to "+cost_without_repartition);
360+
boolean need_repartition = cost_with_repartition < cost_without_repartition;
361+
if (need_repartition) {
362+
repartitionStep(fs, files, stockShape);
363+
}
364+
365+
// Redistribute join the larger file and the partitioned file
366+
long result_size = DistributedJoin.joinStep(fs, files, stockShape,
367+
output);
368+
369+
return result_size;
370+
}
371+
372+
public static void main(String[] args) throws IOException {
373+
CommandLineArguments cla = new CommandLineArguments(args);
374+
Path[] files = cla.getPaths();
375+
JobConf conf = new JobConf(DistributedJoin.class);
376+
FileSystem fs = files[0].getFileSystem(conf);
377+
Shape stockShape = cla.getShape(true);
378+
String repartition = cla.getRepartition();
379+
380+
long result_size;
381+
if (repartition == null || repartition.equals("auto")) {
382+
result_size = distributedJoinSmart(fs, files, stockShape, null);
383+
} else if (repartition.equals("yes")) {
384+
repartitionStep(fs, files, stockShape);
385+
result_size = joinStep(fs, files, stockShape, null);
386+
} else if (repartition.equals("no")) {
387+
result_size = joinStep(fs, files, stockShape, null);
388+
} else {
389+
throw new RuntimeException("Illegal parameter repartition:"+repartition);
390+
}
391+
392+
System.out.println("Result size: "+result_size);
393+
}
394+
}

0 commit comments

Comments
 (0)