Skip to content

Commit 2619c55

Browse files
committed
Add logging around which DbDialect gets used
1 parent e7e5bf8 commit 2619c55

File tree

3 files changed

+27
-8
lines changed

3 files changed

+27
-8
lines changed

src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ public class JdbcDbWriter {
4141

4242
Connection connection;
4343

44-
JdbcDbWriter(final JdbcSinkConfig config) {
44+
JdbcDbWriter(final JdbcSinkConfig config, DbDialect dbDialect, DbStructure dbStructure) {
4545
this.config = config;
46-
dbDialect = DbDialect.fromConnectionString(config.connectionUrl);
47-
dbStructure = new DbStructure(dbDialect);
46+
this.dbDialect = dbDialect;
47+
this.dbStructure = dbStructure;
4848
}
4949

5050
void write(final Collection<SinkRecord> records) throws SQLException {

src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Collection;
3030
import java.util.Map;
3131

32+
import io.confluent.connect.jdbc.sink.dialect.DbDialect;
33+
3234
public class JdbcSinkTask extends SinkTask {
3335
private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class);
3436

@@ -40,10 +42,17 @@ public class JdbcSinkTask extends SinkTask {
4042
public void start(final Map<String, String> props) {
4143
log.info("Starting task");
4244
config = new JdbcSinkConfig(props);
43-
writer = new JdbcDbWriter(config);
45+
initWriter();
4446
remainingRetries = config.maxRetries;
4547
}
4648

49+
private void initWriter() {
50+
final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);
51+
final DbStructure dbStructure = new DbStructure(dbDialect);
52+
log.info("Initializing writer using SQL dialect: {}", dbDialect.getClass().getSimpleName());
53+
writer = new JdbcDbWriter(config, dbDialect, dbStructure);
54+
}
55+
4756
@Override
4857
public void put(Collection<SinkRecord> records) {
4958
if (records.isEmpty()) {
@@ -61,7 +70,7 @@ public void put(Collection<SinkRecord> records) {
6170
throw new ConnectException(sqle);
6271
} else {
6372
writer.closeQuietly();
64-
writer = new JdbcDbWriter(config);
73+
initWriter();
6574
remainingRetries--;
6675
context.timeout(config.retryBackoffMs);
6776
throw new RetriableException(sqle);

src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Map;
3535
import java.util.concurrent.ThreadLocalRandom;
3636

37+
import io.confluent.connect.jdbc.sink.dialect.DbDialect;
38+
import io.confluent.connect.jdbc.sink.dialect.SqliteDialect;
3739
import io.confluent.connect.jdbc.sink.metadata.DbTable;
3840

3941
import static junit.framework.TestCase.assertTrue;
@@ -53,6 +55,13 @@ public void tearDown() throws IOException, SQLException {
5355
sqliteHelper.tearDown();
5456
}
5557

58+
private JdbcDbWriter newWriter(Map<String, String> props) {
59+
final JdbcSinkConfig config = new JdbcSinkConfig(props);
60+
final DbDialect dbDialect = new SqliteDialect();
61+
final DbStructure dbStructure = new DbStructure(dbDialect);
62+
return new JdbcDbWriter(config, dbDialect, dbStructure);
63+
}
64+
5665
@Test
5766
public void autoCreateWithAutoEvolve() throws SQLException {
5867
String topic = "books";
@@ -64,7 +73,7 @@ public void autoCreateWithAutoEvolve() throws SQLException {
6473
props.put("pk.mode", "record_key");
6574
props.put("pk.fields", "id"); // assigned name for the primitive key
6675

67-
JdbcDbWriter writer = new JdbcDbWriter(new JdbcSinkConfig(props));
76+
JdbcDbWriter writer = newWriter(props);
6877

6978
Schema keySchema = Schema.INT64_SCHEMA;
7079

@@ -151,7 +160,7 @@ private void writeSameRecordTwiceExpectingSingleUpdate(
151160
props.put("pk.fields", pkFields);
152161
props.put("insert.mode", insertMode.toString());
153162

154-
JdbcDbWriter writer = new JdbcDbWriter(new JdbcSinkConfig(props));
163+
JdbcDbWriter writer = newWriter(props);
155164

156165
Schema keySchema = SchemaBuilder.struct()
157166
.field("id", SchemaBuilder.INT64_SCHEMA);
@@ -231,7 +240,8 @@ public void sameRecordNTimes() throws SQLException {
231240
props.put("table.name.format", tableName);
232241
props.put("batch.size", String.valueOf(ThreadLocalRandom.current().nextInt(20, 100)));
233242

234-
JdbcDbWriter writer = new JdbcDbWriter(new JdbcSinkConfig(props));
243+
JdbcDbWriter writer = newWriter(props);
244+
235245
writer.write(Collections.nCopies(
236246
numRecords,
237247
new SinkRecord("topic", 0, null, null, schema, struct, 0)

0 commit comments

Comments
 (0)