Skip to content

Commit 6cbe9c7

Browse files
committed
Maven build, SF changes
1 parent 8a3b4d9 commit 6cbe9c7

27 files changed

+590
-798
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
activeProfiles=
1+
activeProfiles=-Dhadoop20
22
eclipse.preferences.version=1
33
resolveWorkspaceProjects=true
44
version=1

pom.xml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
<mysql.connector.version>5.1.13</mysql.connector.version>
3535
<jackson.version>1.4.0</jackson.version>
3636
<commons-net.version>1.4.1</commons-net.version>
37-
<salesforce.version>26.0.0</salesforce.version>
38-
<salesforce.partner.version>26.0.0</salesforce.partner.version>
37+
<salesforce.version>27.0.0</salesforce.version>
38+
<salesforce.partner.version>27.0.0</salesforce.partner.version>
3939
<junit.version>4.5</junit.version>
4040
<compileSource>1.6</compileSource>
4141
<maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
@@ -59,6 +59,12 @@
5959
<artifactId>hadoop-core</artifactId>
6060
<version>${hadoop.version}</version>
6161
</dependency>
62+
<dependency>
63+
<groupId>org.apache.hadoop</groupId>
64+
<artifactId>hadoop-test</artifactId>
65+
<version>${hadoop.version}</version>
66+
<scope>test</scope>
67+
</dependency>
6268
</dependencies>
6369
</profile>
6470
<profile>

scripts/hiho

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ EXPORTTO=$2
4141
CLASS='co.nubetech.hiho.job.ExportToOracleDb'
4242
fi
4343
if [ "$EXPORTTO" = "saleforce" ] ; then
44-
CLASS='co.nubetech.hiho.job.sf.ExportSalesForceJob'
44+
CLASS='co.nubetech.hiho.job.sf.ExportSalesforceJob'
4545
fi
4646
fi
4747

src/main/java/co/nubetech/hiho/common/sf/BatchRequest.java

Lines changed: 0 additions & 104 deletions
This file was deleted.
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package co.nubetech.hiho.common.sf;
2+
3+
import java.io.IOException;
4+
import java.util.HashMap;
5+
import java.util.HashSet;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Set;
9+
10+
import org.apache.hadoop.fs.FSDataInputStream;
11+
12+
import com.sforce.async.AsyncApiException;
13+
import com.sforce.async.BatchInfo;
14+
import com.sforce.async.BatchStateEnum;
15+
import com.sforce.async.BulkConnection;
16+
import com.sforce.async.CSVReader;
17+
import com.sforce.async.ContentType;
18+
import com.sforce.async.JobInfo;
19+
import com.sforce.async.JobStateEnum;
20+
import com.sforce.async.OperationEnum;
21+
import com.sforce.soap.partner.PartnerConnection;
22+
import com.sforce.ws.ConnectionException;
23+
import com.sforce.ws.ConnectorConfig;
24+
25+
public class SFHandler {
26+
27+
/**
28+
* Create the BulkConnection used to call Bulk API operations.
29+
*/
30+
public BulkConnection getBulkConnection(String userName,
31+
String password) throws ConnectionException, AsyncApiException {
32+
ConnectorConfig partnerConfig = new ConnectorConfig();
33+
partnerConfig.setUsername(userName);
34+
partnerConfig.setPassword(password);
35+
partnerConfig
36+
.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/27.0");
37+
// Creating the connection automatically handles login and stores
38+
// the session in partnerConfig
39+
new PartnerConnection(partnerConfig);
40+
// When PartnerConnection is instantiated, a login is implicitly
41+
// executed and, if successful,
42+
// a valid session is stored in the ConnectorConfig instance.
43+
// Use this key to initialize a BulkConnection:
44+
ConnectorConfig config = new ConnectorConfig();
45+
config.setSessionId(partnerConfig.getSessionId());
46+
// The endpoint for the Bulk API service is the same as for the normal
47+
// SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
48+
String soapEndpoint = partnerConfig.getServiceEndpoint();
49+
String apiVersion = "27.0";
50+
String restEndpoint = soapEndpoint.substring(0,
51+
soapEndpoint.indexOf("Soap/"))
52+
+ "async/" + apiVersion;
53+
config.setRestEndpoint(restEndpoint);
54+
// This should only be false when doing debugging.
55+
config.setCompression(true);
56+
// Set this to true to see HTTP requests and responses on stdout
57+
config.setTraceMessage(false);
58+
BulkConnection connection = new BulkConnection(config);
59+
return connection;
60+
}
61+
62+
/**
63+
* Create a new job using the Bulk API.
64+
*
65+
* @param sobjectType
66+
*
67+
* The object type being loaded, such as "Account"
68+
* @param connection
69+
*
70+
* BulkConnection used to create the new job.
71+
* @return The JobInfo for the new job.
72+
* @throws AsyncApiException
73+
*/
74+
public JobInfo createJob(String sobjectType,
75+
BulkConnection connection) throws AsyncApiException {
76+
JobInfo job = new JobInfo();
77+
job.setObject(sobjectType);
78+
job.setOperation(OperationEnum.insert);
79+
job.setContentType(ContentType.CSV);
80+
job = connection.createJob(job);
81+
System.out.println(job);
82+
return job;
83+
}
84+
85+
public void closeJob(BulkConnection connection, String jobId)
86+
throws AsyncApiException {
87+
JobInfo job = new JobInfo();
88+
job.setId(jobId);
89+
job.setState(JobStateEnum.Closed);
90+
connection.updateJob(job);
91+
}
92+
93+
/**
94+
* Wait for a job to complete by polling the Bulk API.
95+
*
96+
* @param connection
97+
*
98+
* BulkConnection used to check results.
99+
* @param job
100+
*
101+
* The job awaiting completion.
102+
* @param batchInfoList
103+
*
104+
* List of batches for this job.
105+
* @throws AsyncApiException
106+
*/
107+
public void awaitCompletion(BulkConnection connection, JobInfo job,
108+
List<BatchInfo> batchInfoList) throws AsyncApiException {
109+
long sleepTime = 0L;
110+
Set<String> incomplete = new HashSet<String>();
111+
for (BatchInfo bi : batchInfoList) {
112+
incomplete.add(bi.getId());
113+
}
114+
while (!incomplete.isEmpty()) {
115+
try {
116+
Thread.sleep(sleepTime);
117+
} catch (InterruptedException e) {
118+
}
119+
System.out.println("Awaiting results..." + incomplete.size());
120+
sleepTime = 10000L;
121+
BatchInfo[] statusList = connection.getBatchInfoList(job.getId())
122+
.getBatchInfo();
123+
for (BatchInfo b : statusList) {
124+
if (b.getState() == BatchStateEnum.Completed
125+
|| b.getState() == BatchStateEnum.Failed) {
126+
if (incomplete.remove(b.getId())) {
127+
System.out.println("BATCH STATUS:\n" + b);
128+
}
129+
}
130+
}
131+
}
132+
}
133+
134+
/**
135+
* Gets the results of the operation and checks for errors.
136+
*/
137+
public void checkResults(BulkConnection connection, JobInfo job,
138+
List<BatchInfo> batchInfoList) throws AsyncApiException,
139+
IOException {
140+
// batchInfoList was populated when batches were created and submitted
141+
for (BatchInfo b : batchInfoList) {
142+
CSVReader rdr = new CSVReader(connection.getBatchResultStream(
143+
job.getId(), b.getId()));
144+
List<String> resultHeader = rdr.nextRecord();
145+
int resultCols = resultHeader.size();
146+
List<String> row;
147+
while ((row = rdr.nextRecord()) != null) {
148+
Map<String, String> resultInfo = new HashMap<String, String>();
149+
for (int i = 0; i < resultCols; i++) {
150+
resultInfo.put(resultHeader.get(i), row.get(i));
151+
}
152+
boolean success = Boolean.valueOf(resultInfo.get("Success"));
153+
boolean created = Boolean.valueOf(resultInfo.get("Created"));
154+
String id = resultInfo.get("Id");
155+
String error = resultInfo.get("Error");
156+
if (success && created) {
157+
System.out.println("Created row with id " + id);
158+
} else if (!success) {
159+
System.out.println("Failed with error: " + error);
160+
}
161+
}
162+
}
163+
}
164+
165+
166+
/**
167+
* Create a batch by uploading the contents of the file. This closes the
168+
* output stream.
169+
*
170+
* @param tmpOut
171+
*
172+
* The output stream used to write the CSV data for a single
173+
* batch.
174+
* @param tmpFile
175+
*
176+
* The file associated with the above stream.
177+
* @param batchInfos
178+
*
179+
* The batch info for the newly created batch is added to this
180+
* list.
181+
* @param connection
182+
*
183+
* The BulkConnection used to create the new batch.
184+
* @param jobInfo
185+
*
186+
* The JobInfo associated with the new batch.
187+
*/
188+
public BatchInfo createBatch(FSDataInputStream tmpInputStream,
189+
BulkConnection connection,
190+
JobInfo jobInfo) throws IOException, AsyncApiException {
191+
try {
192+
BatchInfo batchInfo = connection.createBatchFromStream(jobInfo,
193+
tmpInputStream);
194+
System.out.println(batchInfo);
195+
return batchInfo;
196+
} finally {
197+
tmpInputStream.close();
198+
}
199+
}
200+
201+
}

0 commit comments

Comments
 (0)