Skip to content

Commit d07133f

Browse files
committed
Export to any db changes
1 parent 9f75aa1 commit d07133f

File tree

10 files changed

+1052
-1
lines changed

10 files changed

+1052
-1
lines changed

README

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ For Delimited Text Files:
139139
${HIHO_HOME}/scripts/hiho merge -newPath testData/merge/inputNew/fileInNewPath.txt -oldPath testData/merge/inputOld/fileInOldPath.txt -mergeBy value -outputPath output -inputFormat co.nubetech.hiho.dedup.DelimitedTextInputFormat -inputKeyClassName org.apache.hadoop.io.Text -inputValueClassName org.apache.hadoop.io.Text
140140

141141

142+
8. Export to DB:-
143+
bin/hadoop jar deploy/hiho-0.4.0.jar co.nubetech.hiho.job.ExportToDB -jdbcDriver <jdbcDriverName> -jdbcUrl <jdbcUrl> -jdbcUsername <jdbcUserName> -jdbcPassword <jdbcPassword> -delimiter <delimiter> -numberOfMappers <numberOfMappers> -tableName <tableName> -columnNames <columnNames> -inputPath <inputPath>
144+
or
145+
${HIHO_HOME}/scripts/hiho export db -jdbcDriver <jdbcDriverName> -jdbcUrl <jdbcUrl> -jdbcUsername <jdbcUserName> -jdbcPassword <jdbcPassword> -delimiter <delimiter> -numberOfMappers <numberOfMappers> -tableName <tableName> -columnNames <columnNames> -inputPath <inputPath>
146+
147+
142148
New Features in this release
143149
-incremental import and introduction of AppendFileInputFormat
144150
-Oracle export

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<property environment="env" />
2828
<property name="name" value="hiho" />
2929
<property name="Name" value="HIHO" />
30-
<property name="version" value="0.4.0" />
30+
<property name="version" value="0.5.0" />
3131

3232

3333
<property name="artifact.name" value="${name}-${version}" />

src/co/nubetech/hiho/common/HIHOConf.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,7 @@ public interface HIHOConf {
7474
// conf for mergeJob
7575
public static final String MERGE_OLD_PATH = "mapreduce.jdbc.hiho.merge.mergeOldPath";
7676
public static final String MERGE_NEW_PATH = "mapreduce.jdbc.hiho.merge.mergeNewPath";
77+
78+
public static final String COLUMN_INFO = "mapreduce.jdbc.hiho.db.columnInfo";
7779

7880
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/**
2+
* Copyright 2011 Nube Technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed
11+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12+
* CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*/
15+
package co.nubetech.hiho.job;
16+
17+
import org.apache.hadoop.conf.Configuration;
18+
import org.apache.hadoop.conf.Configured;
19+
import org.apache.hadoop.fs.Path;
20+
import org.apache.hadoop.mapreduce.Job;
21+
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
22+
import org.apache.hadoop.util.Tool;
23+
import org.apache.hadoop.util.ToolRunner;
24+
25+
import co.nubetech.apache.hadoop.DBConfiguration;
26+
import co.nubetech.apache.hadoop.MRJobConfig;
27+
import co.nubetech.hiho.common.HIHOConf;
28+
import co.nubetech.hiho.common.HIHOException;
29+
import co.nubetech.hiho.mapreduce.GenericDBLoadDataMapper;
30+
import co.nubetech.hiho.mapreduce.lib.db.GenericDBOutputFormat;
31+
32+
public class ExportToDB extends Configured implements Tool {
33+
34+
private String inputPath = null;
35+
private String tableName = null;
36+
private String columnNames = null;
37+
38+
public void populateConfiguration(String[] args, Configuration conf) {
39+
for (int i = 0; i < args.length - 1; i++) {
40+
if ("-jdbcDriver".equals(args[i])) {
41+
conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, args[++i]);
42+
} else if ("-jdbcUrl".equals(args[i])) {
43+
conf.set(DBConfiguration.URL_PROPERTY, args[++i]);
44+
} else if ("-jdbcUsername".equals(args[i])) {
45+
conf.set(DBConfiguration.USERNAME_PROPERTY, args[++i]);
46+
} else if ("-jdbcPassword".equals(args[i])) {
47+
conf.set(DBConfiguration.PASSWORD_PROPERTY, args[++i]);
48+
} else if ("-delimiter".equals(args[i])) {
49+
conf.set(HIHOConf.INPUT_OUTPUT_DELIMITER, args[++i]);
50+
} else if ("-numberOfMappers".equals(args[i])) {
51+
conf.set(HIHOConf.NUMBER_MAPPERS, args[++i]);
52+
} else if ("-tableName".equals(args[i])) {
53+
tableName = args[++i];
54+
} else if ("-columnNames".equals(args[i])) {
55+
columnNames = args[++i];
56+
} else if ("-inputPath".equals(args[i])) {
57+
inputPath = args[++i];
58+
}
59+
}
60+
}
61+
62+
public void checkMandatoryConfs(Configuration conf) throws HIHOException {
63+
if (conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY) == null) {
64+
throw new HIHOException(
65+
"JDBC driver configuration is not specified,please specify JDBC driver class.");
66+
}
67+
if (conf.get(DBConfiguration.URL_PROPERTY) == null) {
68+
throw new HIHOException(
69+
"JDBC url path configuration is empty,please specify JDBC url path.");
70+
}
71+
if (!conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY).contains("hsqldb")) {
72+
if (conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
73+
throw new HIHOException(
74+
"JDBC user name configuration is empty,please specify JDBC user name.");
75+
}
76+
if (conf.get(DBConfiguration.PASSWORD_PROPERTY) == null) {
77+
throw new HIHOException(
78+
"JDBC password configuration is empty,please specify JDBC password.");
79+
}
80+
}
81+
if (conf.get(HIHOConf.INPUT_OUTPUT_DELIMITER) == null) {
82+
throw new HIHOException(
83+
"The provided delimiter is empty, please specify delimiter.");
84+
}
85+
if (conf.get(HIHOConf.NUMBER_MAPPERS) == null) {
86+
throw new HIHOException(
87+
"The provided number of mappers is empty, please specify number of mappers.");
88+
}
89+
if (inputPath == null) {
90+
throw new HIHOException(
91+
"The provided input path is empty, please specify inputPath.");
92+
}
93+
if (tableName == null) {
94+
throw new HIHOException(
95+
"The provided table name is empty, please specify tableName.");
96+
}
97+
if (columnNames == null) {
98+
throw new HIHOException(
99+
"The provided column name is empty, please specify columnName.");
100+
}
101+
}
102+
103+
public int run(String[] args) throws Exception {
104+
Configuration conf = getConf();
105+
populateConfiguration(args, conf);
106+
try {
107+
checkMandatoryConfs(conf);
108+
} catch (HIHOException e1) {
109+
e1.printStackTrace();
110+
throw new Exception(e1);
111+
}
112+
Job job = new Job(conf);
113+
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS,
114+
conf.getInt(HIHOConf.NUMBER_MAPPERS, 1));
115+
job.setJobName("HihoDBExport");
116+
117+
job.setMapperClass(GenericDBLoadDataMapper.class);
118+
job.setJarByClass(ExportToDB.class);
119+
job.setNumReduceTasks(0);
120+
job.setInputFormatClass(TextInputFormat.class);
121+
TextInputFormat.addInputPath(job, new Path(inputPath));
122+
GenericDBOutputFormat.setOutput(job, tableName, columnNames);
123+
124+
int ret = 0;
125+
try {
126+
ret = job.waitForCompletion(true) ? 0 : 1;
127+
} catch (Exception e) {
128+
e.printStackTrace();
129+
}
130+
return ret;
131+
132+
}
133+
134+
public static void main(String[] args) throws Exception {
135+
int res = ToolRunner.run(new Configuration(), new ExportToDB(), args);
136+
System.exit(res);
137+
}
138+
139+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2011 Nube Technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed
11+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12+
* CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*/
15+
package co.nubetech.hiho.mapreduce;
16+
17+
import java.io.IOException;
18+
import java.sql.Types;
19+
import java.text.DateFormat;
20+
import java.text.ParseException;
21+
import java.text.SimpleDateFormat;
22+
import java.util.ArrayList;
23+
import java.util.Calendar;
24+
import java.util.GregorianCalendar;
25+
import java.util.Iterator;
26+
import java.util.StringTokenizer;
27+
28+
import org.apache.hadoop.io.NullWritable;
29+
import org.apache.hadoop.mapreduce.Mapper;
30+
import org.apache.log4j.Logger;
31+
import org.codehaus.jackson.map.ObjectMapper;
32+
import org.codehaus.jackson.type.TypeReference;
33+
34+
import co.nubetech.hiho.common.HIHOConf;
35+
import co.nubetech.hiho.mapreduce.lib.db.ColumnInfo;
36+
import co.nubetech.hiho.mapreduce.lib.db.GenericDBWritable;
37+
38+
public class GenericDBLoadDataMapper<K, V> extends
39+
Mapper<K, V, GenericDBWritable, NullWritable> {
40+
41+
final static Logger logger = Logger
42+
.getLogger(co.nubetech.hiho.mapreduce.GenericDBLoadDataMapper.class);
43+
44+
private ArrayList values;
45+
private ArrayList<ColumnInfo> tableInfo;
46+
private String delimiter;
47+
48+
public ArrayList<ColumnInfo> getTableInfo() {
49+
return tableInfo;
50+
}
51+
52+
public String getDelimiter() {
53+
return delimiter;
54+
}
55+
56+
public void setTableInfo(ArrayList<ColumnInfo> tableInfo) {
57+
this.tableInfo = tableInfo;
58+
}
59+
60+
public void setDelimiter(String delimiter) {
61+
this.delimiter = delimiter;
62+
}
63+
64+
protected void setup(Mapper.Context context) throws IOException,
65+
InterruptedException {
66+
delimiter = context.getConfiguration().get(
67+
HIHOConf.INPUT_OUTPUT_DELIMITER);
68+
logger.debug("delimiter is: " + delimiter);
69+
String columnInfoJsonString = context.getConfiguration().get(
70+
HIHOConf.COLUMN_INFO);
71+
logger.debug("columnInfoJsonString is: " + columnInfoJsonString);
72+
ObjectMapper mapper = new ObjectMapper();
73+
tableInfo = mapper.readValue(columnInfoJsonString,
74+
new TypeReference<ArrayList<ColumnInfo>>() {
75+
});
76+
}
77+
78+
public void map(K key, V val, Context context) throws IOException,
79+
InterruptedException {
80+
values = new ArrayList();
81+
82+
logger.debug("Key is: " + key);
83+
logger.debug("Value is: " + val);
84+
85+
StringTokenizer rowValue = new StringTokenizer(val.toString(), delimiter);
86+
if (rowValue.countTokens() == tableInfo.size()) {
87+
Iterator<ColumnInfo> iterator = tableInfo.iterator();
88+
while (iterator.hasNext()) {
89+
ColumnInfo columnInfo = iterator.next();
90+
String columnValue = rowValue.nextToken();
91+
if (columnValue == null || columnValue.trim().equals("")) {
92+
values.add(null);
93+
} else {
94+
logger.debug("Adding value : " + columnValue);
95+
int type = columnInfo.getType();
96+
if (type == Types.VARCHAR) {
97+
values.add(columnValue);
98+
} else if (type == Types.BIGINT) {
99+
values.add(Long.parseLong(columnValue));
100+
} else if (type == Types.INTEGER) {
101+
values.add(Integer.parseInt(columnValue));
102+
} else if (type == Types.DOUBLE) {
103+
values.add(Double.parseDouble(columnValue));
104+
} else if (type == Types.FLOAT) {
105+
values.add(Float.parseFloat(columnValue));
106+
} else if (type == Types.BOOLEAN) {
107+
values.add(Boolean.parseBoolean(columnValue));
108+
} else if (type == Types.DATE) {
109+
DateFormat df = new SimpleDateFormat();
110+
try {
111+
values.add(df.parse(columnValue));
112+
} catch (ParseException e) {
113+
e.printStackTrace();
114+
throw new IOException(e);
115+
}
116+
}
117+
}
118+
}
119+
} else {
120+
throw new IOException(
121+
"Number of columns specified in table is not equal to the columns contains in the file.");
122+
}
123+
GenericDBWritable gdw = new GenericDBWritable(tableInfo, values);
124+
context.write(gdw, null);
125+
126+
}
127+
128+
}

src/co/nubetech/hiho/mapreduce/lib/db/ColumnInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public ColumnInfo(int index, int type, String name) {
3131
this.type = type;
3232
this.name = name;
3333
}
34+
35+
public ColumnInfo(){
36+
37+
}
3438

3539
public int getIndex() {
3640
return index;

0 commit comments

Comments
 (0)