Skip to content

Commit d596f02

Browse files
tledkov-gridgaindevozerov
authored andcommitted
IGNITE-4507: Hadoop: added direct output support for combiner. This closes apache#1434.
1 parent d6d42c2 commit d596f02

File tree

12 files changed

+145
-95
lines changed

12 files changed

+145
-95
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,14 @@ public void output(HadoopTaskOutput out) {
207207
* @throws IgniteCheckedException On any error in callable.
208208
*/
209209
public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
210+
211+
/**
212+
* Callback invoked from mapper thread when map is finished.
213+
*
214+
* @throws IgniteCheckedException If failed.
215+
*/
216+
public void onMapperFinished() throws IgniteCheckedException {
217+
if (output instanceof HadoopMapperAwareTaskOutput)
218+
((HadoopMapperAwareTaskOutput)output).onMapperFinished();
219+
}
210220
}

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
3131
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
3232
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
33+
import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
3334
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
3435
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
3536
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
4546
/**
4647
* Constructor.
4748
*
48-
* @param taskInfo
49+
* @param taskInfo Taks info.
4950
*/
5051
public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
5152
super(taskInfo);
@@ -56,67 +57,79 @@ public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
5657
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
5758
HadoopJob job = taskCtx.job();
5859

59-
HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
60+
HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
6061

61-
JobConf jobConf = ctx.jobConf();
62+
if (taskCtx.taskInfo().hasMapperIndex())
63+
HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
64+
else
65+
HadoopMapperUtils.clearMapperIndex();
6266

63-
InputFormat inFormat = jobConf.getInputFormat();
67+
try {
68+
JobConf jobConf = taskCtx0.jobConf();
6469

65-
HadoopInputSplit split = info().inputSplit();
70+
InputFormat inFormat = jobConf.getInputFormat();
6671

67-
InputSplit nativeSplit;
72+
HadoopInputSplit split = info().inputSplit();
6873

69-
if (split instanceof HadoopFileBlock) {
70-
HadoopFileBlock block = (HadoopFileBlock)split;
74+
InputSplit nativeSplit;
7175

72-
nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
73-
}
74-
else
75-
nativeSplit = (InputSplit)ctx.getNativeSplit(split);
76+
if (split instanceof HadoopFileBlock) {
77+
HadoopFileBlock block = (HadoopFileBlock)split;
7678

77-
assert nativeSplit != null;
79+
nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
80+
}
81+
else
82+
nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
7883

79-
Reporter reporter = new HadoopV1Reporter(taskCtx);
84+
assert nativeSplit != null;
8085

81-
HadoopV1OutputCollector collector = null;
86+
Reporter reporter = new HadoopV1Reporter(taskCtx);
8287

83-
try {
84-
collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
85-
fileName(), ctx.attemptId());
88+
HadoopV1OutputCollector collector = null;
8689

87-
RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
90+
try {
91+
collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
92+
fileName(), taskCtx0.attemptId());
8893

89-
Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
94+
RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
9095

91-
Object key = reader.createKey();
92-
Object val = reader.createValue();
96+
Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
9397

94-
assert mapper != null;
98+
Object key = reader.createKey();
99+
Object val = reader.createValue();
100+
101+
assert mapper != null;
95102

96-
try {
97103
try {
98-
while (reader.next(key, val)) {
99-
if (isCancelled())
100-
throw new HadoopTaskCancelledException("Map task cancelled.");
104+
try {
105+
while (reader.next(key, val)) {
106+
if (isCancelled())
107+
throw new HadoopTaskCancelledException("Map task cancelled.");
108+
109+
mapper.map(key, val, collector, reporter);
110+
}
101111

102-
mapper.map(key, val, collector, reporter);
112+
taskCtx.onMapperFinished();
113+
}
114+
finally {
115+
mapper.close();
103116
}
104117
}
105118
finally {
106-
mapper.close();
119+
collector.closeWriter();
107120
}
121+
122+
collector.commit();
108123
}
109-
finally {
110-
collector.closeWriter();
111-
}
124+
catch (Exception e) {
125+
if (collector != null)
126+
collector.abort();
112127

113-
collector.commit();
128+
throw new IgniteCheckedException(e);
129+
}
114130
}
115-
catch (Exception e) {
116-
if (collector != null)
117-
collector.abort();
118-
119-
throw new IgniteCheckedException(e);
131+
finally {
132+
HadoopMapperUtils.clearMapperIndex();
120133
}
121134
}
122135
}

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.util.ReflectionUtils;
2424
import org.apache.ignite.IgniteCheckedException;
2525
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
26+
import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
2627
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
2728
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
2829
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -53,49 +54,63 @@ public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
5354
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
5455
HadoopJob job = taskCtx.job();
5556

56-
HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
57+
HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
5758

58-
JobConf jobConf = ctx.jobConf();
59-
60-
HadoopTaskInput input = taskCtx.input();
61-
62-
HadoopV1OutputCollector collector = null;
59+
if (!reduce && taskCtx.taskInfo().hasMapperIndex())
60+
HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
61+
else
62+
HadoopMapperUtils.clearMapperIndex();
6363

6464
try {
65-
collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
65+
JobConf jobConf = taskCtx0.jobConf();
6666

67-
Reducer reducer;
68-
if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
69-
jobConf);
70-
else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
71-
jobConf);
67+
HadoopTaskInput input = taskCtx.input();
7268

73-
assert reducer != null;
69+
HadoopV1OutputCollector collector = null;
7470

7571
try {
72+
collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
73+
74+
Reducer reducer;
75+
if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
76+
jobConf);
77+
else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
78+
jobConf);
79+
80+
assert reducer != null;
81+
7682
try {
77-
while (input.next()) {
78-
if (isCancelled())
79-
throw new HadoopTaskCancelledException("Reduce task cancelled.");
83+
try {
84+
while (input.next()) {
85+
if (isCancelled())
86+
throw new HadoopTaskCancelledException("Reduce task cancelled.");
87+
88+
reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
89+
}
8090

81-
reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
91+
if (!reduce)
92+
taskCtx.onMapperFinished();
93+
}
94+
finally {
95+
reducer.close();
8296
}
8397
}
8498
finally {
85-
reducer.close();
99+
collector.closeWriter();
86100
}
101+
102+
collector.commit();
87103
}
88-
finally {
89-
collector.closeWriter();
90-
}
104+
catch (Exception e) {
105+
if (collector != null)
106+
collector.abort();
91107

92-
collector.commit();
108+
throw new IgniteCheckedException(e);
109+
}
93110
}
94-
catch (Exception e) {
95-
if (collector != null)
96-
collector.abort();
97-
98-
throw new IgniteCheckedException(e);
111+
finally {
112+
if (!reduce)
113+
HadoopMapperUtils.clearMapperIndex();
99114
}
100115
}
101116
}

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,6 @@ public HadoopV2Context(HadoopV2TaskContext ctx) {
154154
}
155155
}
156156

157-
/**
158-
* Callback invoked from mapper thread when map is finished.
159-
*
160-
* @throws IgniteCheckedException If failed.
161-
*/
162-
public void onMapperFinished() throws IgniteCheckedException {
163-
if (output instanceof HadoopMapperAwareTaskOutput)
164-
((HadoopMapperAwareTaskOutput)output).onMapperFinished();
165-
}
166-
167157
/** {@inheritDoc} */
168158
@Override public OutputCommitter getOutputCommitter() {
169159
throw new UnsupportedOperationException();

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,30 +56,32 @@ public HadoopV2MapTask(HadoopTaskInfo taskInfo) {
5656
HadoopMapperUtils.clearMapperIndex();
5757

5858
try {
59-
InputSplit nativeSplit = hadoopContext().getInputSplit();
59+
HadoopV2Context hadoopCtx = hadoopContext();
60+
61+
InputSplit nativeSplit = hadoopCtx.getInputSplit();
6062

6163
if (nativeSplit == null)
6264
throw new IgniteCheckedException("Input split cannot be null.");
6365

6466
InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
65-
hadoopContext().getConfiguration());
67+
hadoopCtx.getConfiguration());
6668

67-
RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
69+
RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx);
6870

69-
reader.initialize(nativeSplit, hadoopContext());
71+
reader.initialize(nativeSplit, hadoopCtx);
7072

71-
hadoopContext().reader(reader);
73+
hadoopCtx.reader(reader);
7274

7375
HadoopJobInfo jobInfo = taskCtx.job().info();
7476

7577
outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
7678

77-
Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
79+
Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration());
7880

7981
try {
80-
mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
82+
mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
8183

82-
hadoopContext().onMapperFinished();
84+
taskCtx.onMapperFinished();
8385
}
8486
finally {
8587
closeWriter();

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.util.ReflectionUtils;
2525
import org.apache.ignite.IgniteCheckedException;
2626
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
27+
import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
2728
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
2829

2930
/**
@@ -53,17 +54,27 @@ public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
5354

5455
JobContextImpl jobCtx = taskCtx.jobContext();
5556

57+
// Set mapper index for combiner tasks
58+
if (!reduce && taskCtx.taskInfo().hasMapperIndex())
59+
HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
60+
else
61+
HadoopMapperUtils.clearMapperIndex();
62+
5663
try {
5764
outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
5865

5966
Reducer reducer;
67+
6068
if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
6169
jobCtx.getConfiguration());
6270
else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
6371
jobCtx.getConfiguration());
6472

6573
try {
6674
reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
75+
76+
if (!reduce)
77+
taskCtx.onMapperFinished();
6778
}
6879
finally {
6980
closeWriter();
@@ -84,6 +95,9 @@ public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
8495
throw new IgniteCheckedException(e);
8596
}
8697
finally {
98+
if (!reduce)
99+
HadoopMapperUtils.clearMapperIndex();
100+
87101
if (err != null)
88102
abort(outputFormat);
89103
}

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
4949
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
5050
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
51+
import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
5152
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
5253
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
5354
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,6 @@ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUn
182182
boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
183183

184184
if (stripeMappers0) {
185-
if (job.info().hasCombiner()) {
186-
log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
187-
job.id() + ']');
188-
189-
stripeMappers0 = false;
190-
}
191-
192185
if (!embedded) {
193186
log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
194187
job.id() + ']');

modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public HadoopDirectDataInput(byte[] buf) {
4848

4949
/** {@inheritDoc} */
5050
@Override public int read() throws IOException {
51-
return readByte();
51+
return (int)readByte() & 0xFF;
5252
}
5353

5454
/** {@inheritDoc} */

0 commit comments

Comments
 (0)