Skip to content

Commit 07ac721

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_impalaPartition' into 1.8_test_3.10.x
2 parents 86b6255 + c931b0b commit 07ac721

File tree

5 files changed

+11
-7
lines changed

5 files changed

+11
-7
lines changed

mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/table/MongoSideParser.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5959
mongoSideTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
6060
mongoSideTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
6161
mongoSideTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
62-
62+
mongoSideTableInfo.check();
6363
return mongoSideTableInfo;
6464
}
6565
}

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/table/MongoSinkParser.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6060
mongoTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
6161
mongoTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
6262
mongoTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
63-
63+
mongoTableInfo.check();
6464
return mongoTableInfo;
6565
}
6666
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void open(Configuration parameters) throws Exception {
5656
.put("user", rdbSideTableInfo.getUserName())
5757
.put("password", rdbSideTableInfo.getPassword())
5858
.put("provider_class", DT_PROVIDER_CLASS)
59-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
59+
.put("preferred_test_query", "select 1 from dual")
6060
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
6161
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6262

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
7878

7979
public final static String DT_PROVIDER_CLASS = "com.dtstack.flink.sql.side.rdb.provider.DTC3P0DataSourceProvider";
8080

81-
public final static String PREFERRED_TEST_QUERY_SQL = "select 1 from dual";
81+
public final static String PREFERRED_TEST_QUERY_SQL = "select 1";
8282

8383
private transient SQLClient rdbSqlClient;
8484

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.io.IOException;
36+
import java.sql.ResultSet;
3637
import java.sql.SQLException;
38+
import java.sql.Statement;
3739
import java.util.List;
3840
import java.util.concurrent.ScheduledExecutorService;
3941
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +46,7 @@
4446

4547
/**
4648
* An upsert OutputFormat for JDBC.
49+
*
4750
* @author maqi
4851
*/
4952
public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
@@ -103,7 +106,7 @@ public JDBCUpsertOutputFormat(
103106
*
104107
* @param taskNumber The number of the parallel instance.
105108
* @throws IOException Thrown, if the output could not be opened due to an
106-
* I/O problem.
109+
* I/O problem.
107110
*/
108111
@Override
109112
public void open(int taskNumber, int numTasks) throws IOException {
@@ -167,7 +170,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOExcep
167170

168171
private void checkConnectionOpen() {
169172
try {
170-
if (connection.isClosed()) {
173+
if (!connection.isValid(10)) {
171174
LOG.info("db connection reconnect..");
172175
establishConnection();
173176
jdbcWriter.prepareStatement(connection);
@@ -270,7 +273,8 @@ public Builder setFieldTypes(int[] fieldTypes) {
270273
}
271274

272275
/**
273-
* optional, partition Fields
276+
* optional, partition Fields
277+
*
274278
* @param partitionFields
275279
* @return
276280
*/

0 commit comments

Comments
 (0)