Skip to content

Commit e463694

Browse files
committed
用isValid代替isClosed
1 parent 64afbc6 commit e463694

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.io.IOException;
36+
import java.sql.ResultSet;
3637
import java.sql.SQLException;
38+
import java.sql.Statement;
3739
import java.util.List;
3840
import java.util.concurrent.ScheduledExecutorService;
3941
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +46,7 @@
4446

4547
/**
4648
* An upsert OutputFormat for JDBC.
49+
*
4750
* @author maqi
4851
*/
4952
public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
@@ -103,7 +106,7 @@ public JDBCUpsertOutputFormat(
103106
*
104107
* @param taskNumber The number of the parallel instance.
105108
* @throws IOException Thrown, if the output could not be opened due to an
106-
* I/O problem.
109+
* I/O problem.
107110
*/
108111
@Override
109112
public void open(int taskNumber, int numTasks) throws IOException {
@@ -167,7 +170,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOExcep
167170

168171
private void checkConnectionOpen() {
169172
try {
170-
if (connection.isClosed()) {
173+
if (!connection.isValid(10)) {
171174
LOG.info("db connection reconnect..");
172175
establishConnection();
173176
jdbcWriter.prepareStatement(connection);
@@ -270,7 +273,8 @@ public Builder setFieldTypes(int[] fieldTypes) {
270273
}
271274

272275
/**
273-
* optional, partition Fields
276+
* optional, partition Fields
277+
*
274278
* @param partitionFields
275279
* @return
276280
*/

0 commit comments

Comments
 (0)