Skip to content

Commit 9b4826e

Browse files
committed
HIVE-12065 : FS stats collection may generate incorrect stats for multi-insert query (Ashutosh Chauhan via Pengcheng Xiong)
Signed-off-by: Ashutosh Chauhan <[email protected]>
1 parent b97fdc0 commit 9b4826e

File tree

54 files changed

+486
-303
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+486
-303
lines changed

itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
package org.apache.hadoop.hive.ql.stats;
2020

21-
import org.apache.hadoop.conf.Configuration;
2221
import org.apache.hadoop.hive.conf.HiveConf;
23-
import org.apache.hadoop.hive.ql.exec.Task;
2422

2523
/**
2624
* An test implementation for StatsAggregator.
@@ -34,26 +32,30 @@ public class DummyStatsAggregator implements StatsAggregator {
3432

3533
// This is a test. The parameter hive.test.dummystats.aggregator's value
3634
// denotes the method which needs to throw an error.
37-
public boolean connect(Configuration hconf, Task sourceTask) {
38-
errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR);
35+
@Override
36+
public boolean connect(StatsCollectionContext scc) {
37+
errorMethod = HiveConf.getVar(scc.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR);
3938
if (errorMethod.equalsIgnoreCase("connect")) {
4039
return false;
4140
}
4241

4342
return true;
4443
}
4544

45+
@Override
4646
public String aggregateStats(String keyPrefix, String statType) {
4747
return null;
4848
}
4949

50-
public boolean closeConnection() {
50+
@Override
51+
public boolean closeConnection(StatsCollectionContext scc) {
5152
if (errorMethod.equalsIgnoreCase("closeConnection")) {
5253
return false;
5354
}
5455
return true;
5556
}
5657

58+
@Override
5759
public boolean cleanUp(String keyPrefix) {
5860
if (errorMethod.equalsIgnoreCase("cleanUp")) {
5961
return false;

itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222

23-
import org.apache.hadoop.conf.Configuration;
2423
import org.apache.hadoop.hive.conf.HiveConf;
2524

2625
/**
@@ -36,32 +35,36 @@ public class DummyStatsPublisher implements StatsPublisher {
3635

3736
// This is a test. The parameter hive.test.dummystats.publisher's value
3837
// denotes the method which needs to throw an error.
39-
public boolean init(Configuration hconf) {
40-
errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
38+
@Override
39+
public boolean init(StatsCollectionContext context) {
40+
errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
4141
if (errorMethod.equalsIgnoreCase("init")) {
4242
return false;
4343
}
4444

4545
return true;
4646
}
4747

48-
public boolean connect(Configuration hconf) {
49-
errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
48+
@Override
49+
public boolean connect(StatsCollectionContext context) {
50+
errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
5051
if (errorMethod.equalsIgnoreCase("connect")) {
5152
return false;
5253
}
5354

5455
return true;
5556
}
5657

58+
@Override
5759
public boolean publishStat(String fileID, Map<String, String> stats) {
5860
if (errorMethod.equalsIgnoreCase("publishStat")) {
5961
return false;
6062
}
6163
return true;
6264
}
6365

64-
public boolean closeConnection() {
66+
@Override
67+
public boolean closeConnection(StatsCollectionContext context) {
6568
if (errorMethod.equalsIgnoreCase("closeConnection")) {
6669
return false;
6770
}

itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.hadoop.hive.ql.stats;
2020

21-
import org.apache.hadoop.conf.Configuration;
22-
import org.apache.hadoop.hive.ql.exec.Task;
2321
import org.apache.hadoop.hive.ql.session.SessionState;
2422

2523
/**
@@ -30,10 +28,12 @@
3028

3129
public class KeyVerifyingStatsAggregator implements StatsAggregator {
3230

33-
public boolean connect(Configuration hconf, Task sourceTask) {
31+
@Override
32+
public boolean connect(StatsCollectionContext scc) {
3433
return true;
3534
}
3635

36+
@Override
3737
public String aggregateStats(String keyPrefix, String statType) {
3838
SessionState ss = SessionState.get();
3939
// Have to use the length instead of the actual prefix because the prefix is location dependent
@@ -43,10 +43,12 @@ public String aggregateStats(String keyPrefix, String statType) {
4343
return null;
4444
}
4545

46-
public boolean closeConnection() {
46+
@Override
47+
public boolean closeConnection(StatsCollectionContext scc) {
4748
return true;
4849
}
4950

51+
@Override
5052
public boolean cleanUp(String keyPrefix) {
5153
return true;
5254
}

ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@
6161
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
6262
import org.apache.hadoop.hive.ql.plan.PlanUtils;
6363
import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
64-
import org.apache.hadoop.hive.ql.plan.TableDesc;
6564
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
6665
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
66+
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
6767
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
6868
import org.apache.hadoop.hive.serde2.SerDeException;
6969
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -1137,7 +1137,9 @@ private void publishStats() throws HiveException {
11371137
return;
11381138
}
11391139

1140-
if (!statsPublisher.connect(hconf)) {
1140+
StatsCollectionContext sContext = new StatsCollectionContext(hconf);
1141+
sContext.setStatsTmpDir(conf.getStatsTmpDir());
1142+
if (!statsPublisher.connect(sContext)) {
11411143
// just return, stats gathering should not block the main query
11421144
LOG.error("StatsPublishing error: cannot connect to database");
11431145
if (isStatsReliable) {
@@ -1204,7 +1206,7 @@ private void publishStats() throws HiveException {
12041206
}
12051207
}
12061208
}
1207-
if (!statsPublisher.closeConnection()) {
1209+
if (!statsPublisher.closeConnection(sContext)) {
12081210
// The original exception is lost.
12091211
// Not changing the interface to maintain backward compatibility
12101212
if (isStatsReliable) {

ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.hadoop.hive.ql.plan.api.StageType;
4848
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
4949
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
50+
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
5051
import org.apache.hadoop.hive.ql.stats.StatsFactory;
5152
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
5253
import org.apache.hadoop.util.StringUtils;
@@ -134,13 +135,14 @@ private int aggregateStats() {
134135

135136
StatsAggregator statsAggregator = null;
136137
int ret = 0;
137-
138+
StatsCollectionContext scc = null;
138139
try {
139140
// Stats setup:
140141
Warehouse wh = new Warehouse(conf);
141142
if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
142143
try {
143-
statsAggregator = createStatsAggregator(conf);
144+
scc = getContext();
145+
statsAggregator = createStatsAggregator(scc);
144146
} catch (HiveException e) {
145147
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
146148
throw e;
@@ -241,7 +243,7 @@ private int aggregateStats() {
241243
}
242244
} finally {
243245
if (statsAggregator != null) {
244-
statsAggregator.closeConnection();
246+
statsAggregator.closeConnection(scc);
245247
}
246248
}
247249
// The return value of 0 indicates success,
@@ -268,7 +270,7 @@ private String getAggregationPrefix(boolean counter, Table table, Partition part
268270
return prefix.toString();
269271
}
270272

271-
private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveException {
273+
private StatsAggregator createStatsAggregator(StatsCollectionContext scc) throws HiveException {
272274
String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
273275
StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
274276
if (factory == null) {
@@ -277,21 +279,30 @@ private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveExceptio
277279
// initialize stats publishing table for noscan which has only stats task
278280
// the rest of MR task following stats task initializes it in ExecDriver.java
279281
StatsPublisher statsPublisher = factory.getStatsPublisher();
280-
if (!statsPublisher.init(conf)) { // creating stats table if not exists
282+
if (!statsPublisher.init(scc)) { // creating stats table if not exists
281283
throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
282284
}
283-
Task sourceTask = getWork().getSourceTask();
284-
if (sourceTask == null) {
285-
throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
286-
}
285+
287286
// manufacture a StatsAggregator
288287
StatsAggregator statsAggregator = factory.getStatsAggregator();
289-
if (!statsAggregator.connect(conf, sourceTask)) {
288+
if (!statsAggregator.connect(scc)) {
290289
throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
291290
}
292291
return statsAggregator;
293292
}
294293

294+
private StatsCollectionContext getContext() throws HiveException {
295+
296+
StatsCollectionContext scc = new StatsCollectionContext(conf);
297+
Task sourceTask = getWork().getSourceTask();
298+
if (sourceTask == null) {
299+
throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
300+
}
301+
scc.setTask(sourceTask);
302+
scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
303+
return scc;
304+
}
305+
295306
private boolean existStats(Map<String, String> parameters) {
296307
return parameters.containsKey(StatsSetupConst.ROW_COUNT)
297308
|| parameters.containsKey(StatsSetupConst.NUM_FILES)

ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
4040
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
4141
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
42+
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
4243
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
4344
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
4445
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -282,7 +283,9 @@ private void publishStats() throws HiveException {
282283

283284
// Initializing a stats publisher
284285
StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
285-
if (!statsPublisher.connect(jc)) {
286+
StatsCollectionContext sc = new StatsCollectionContext(jc);
287+
sc.setStatsTmpDir(conf.getTmpStatsDir());
288+
if (!statsPublisher.connect(sc)) {
286289
// just return, stats gathering should not block the main query.
287290
if (isLogInfoEnabled) {
288291
LOG.info("StatsPublishing error: cannot connect to database.");
@@ -318,7 +321,7 @@ private void publishStats() throws HiveException {
318321
LOG.info("publishing : " + key + " : " + statsToPublish.toString());
319322
}
320323
}
321-
if (!statsPublisher.closeConnection()) {
324+
if (!statsPublisher.closeConnection(sc)) {
322325
if (isStatsReliable) {
323326
throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
324327
}

ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.hadoop.hive.ql.exec;
2020

21-
import static com.google.common.base.Preconditions.checkNotNull;
22-
2321
import java.beans.DefaultPersistenceDelegate;
2422
import java.beans.Encoder;
2523
import java.beans.ExceptionListener;
@@ -102,6 +100,7 @@
102100
import org.apache.hadoop.hive.common.HiveInterruptUtils;
103101
import org.apache.hadoop.hive.common.HiveStatsUtils;
104102
import org.apache.hadoop.hive.common.JavaUtils;
103+
import org.apache.hadoop.hive.common.StatsSetupConst;
105104
import org.apache.hadoop.hive.conf.HiveConf;
106105
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
107106
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -160,6 +159,7 @@
160159
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
161160
import org.apache.hadoop.hive.ql.plan.SparkWork;
162161
import org.apache.hadoop.hive.ql.plan.TableDesc;
162+
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
163163
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
164164
import org.apache.hadoop.hive.ql.plan.api.Graph;
165165
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -3933,4 +3933,31 @@ public static int getDPColOffset(FileSinkDesc conf) {
39333933
}
39343934

39353935
}
3936+
public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) {
3937+
3938+
List<String> statsTmpDirs = new ArrayList<>();
3939+
if (!StatsSetupConst.StatDB.fs.name().equalsIgnoreCase(HiveConf.getVar(conf, ConfVars.HIVESTATSDBCLASS))) {
3940+
// no-op for non-fs stats collection
3941+
return statsTmpDirs;
3942+
}
3943+
// if its auto-stats gather for inserts or CTAS, stats dir will be in FileSink
3944+
Set<Operator<? extends OperatorDesc>> ops = work.getAllLeafOperators();
3945+
if (work instanceof MapWork) {
3946+
// if its an anlayze statement, stats dir will be in TableScan
3947+
ops.addAll(work.getAllRootOperators());
3948+
}
3949+
for (Operator<? extends OperatorDesc> op : ops) {
3950+
OperatorDesc desc = op.getConf();
3951+
String statsTmpDir = null;
3952+
if (desc instanceof FileSinkDesc) {
3953+
statsTmpDir = ((FileSinkDesc)desc).getStatsTmpDir();
3954+
} else if (desc instanceof TableScanDesc) {
3955+
statsTmpDir = ((TableScanDesc) desc).getTmpStatsDir();
3956+
}
3957+
if (statsTmpDir != null && !statsTmpDir.isEmpty()) {
3958+
statsTmpDirs.add(statsTmpDir);
3959+
}
3960+
}
3961+
return statsTmpDirs;
3962+
}
39363963
}

ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Properties;
32+
import java.util.Set;
3233

3334
import org.apache.commons.lang.StringUtils;
3435
import org.apache.commons.logging.Log;
@@ -75,6 +76,7 @@
7576
import org.apache.hadoop.hive.ql.plan.api.StageType;
7677
import org.apache.hadoop.hive.ql.session.SessionState;
7778
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
79+
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
7880
import org.apache.hadoop.hive.ql.stats.StatsFactory;
7981
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
8082
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -243,7 +245,7 @@ public int execute(DriverContext driverContext) {
243245

244246
try {
245247
String partitioner = HiveConf.getVar(job, ConfVars.HIVEPARTITIONER);
246-
job.setPartitionerClass((Class<? extends Partitioner>) JavaUtils.loadClass(partitioner));
248+
job.setPartitionerClass(JavaUtils.loadClass(partitioner));
247249
} catch (ClassNotFoundException e) {
248250
throw new RuntimeException(e.getMessage(), e);
249251
}
@@ -289,7 +291,7 @@ public int execute(DriverContext driverContext) {
289291
LOG.info("Using " + inpFormat);
290292

291293
try {
292-
job.setInputFormat((Class<? extends InputFormat>) JavaUtils.loadClass(inpFormat));
294+
job.setInputFormat(JavaUtils.loadClass(inpFormat));
293295
} catch (ClassNotFoundException e) {
294296
throw new RuntimeException(e.getMessage(), e);
295297
}
@@ -408,7 +410,13 @@ public int execute(DriverContext driverContext) {
408410
StatsFactory factory = StatsFactory.newFactory(job);
409411
if (factory != null) {
410412
statsPublisher = factory.getStatsPublisher();
411-
if (!statsPublisher.init(job)) { // creating stats table if not exists
413+
List<String> statsTmpDir = Utilities.getStatsTmpDirs(mWork, job);
414+
if (rWork != null) {
415+
statsTmpDir.addAll(Utilities.getStatsTmpDirs(rWork, job));
416+
}
417+
StatsCollectionContext sc = new StatsCollectionContext(job);
418+
sc.setStatsTmpDirs(statsTmpDir);
419+
if (!statsPublisher.init(sc)) { // creating stats table if not exists
412420
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
413421
throw
414422
new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());

0 commit comments

Comments
 (0)