Skip to content

Commit 2896661

Browse files
author
Pablo Queixalos
committed
Added schemaPattern connector task configuration, used for fetched table metadata
1 parent 298b3ee commit 2896661

File tree

11 files changed

+102
-94
lines changed

11 files changed

+102
-94
lines changed

src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package io.confluent.connect.jdbc;
1818

19+
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
20+
import io.confluent.connect.jdbc.source.JdbcSourceTask;
21+
import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig;
22+
import io.confluent.connect.jdbc.source.TableMonitorThread;
23+
import io.confluent.connect.jdbc.util.StringUtils;
24+
import io.confluent.connect.jdbc.util.Version;
1925
import org.apache.kafka.common.config.ConfigDef;
2026
import org.apache.kafka.common.config.ConfigException;
2127
import org.apache.kafka.connect.connector.Task;
@@ -36,13 +42,6 @@
3642
import java.util.Map;
3743
import java.util.Set;
3844

39-
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
40-
import io.confluent.connect.jdbc.source.JdbcSourceTask;
41-
import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig;
42-
import io.confluent.connect.jdbc.source.TableMonitorThread;
43-
import io.confluent.connect.jdbc.util.StringUtils;
44-
import io.confluent.connect.jdbc.util.Version;
45-
4645
/**
4746
* JdbcConnector is a Kafka Connect Connector implementation that watches a JDBC database and
4847
* generates tasks to ingest database contents.
@@ -96,6 +95,7 @@ public void start(Map<String, String> properties) throws ConnectException {
9695
+ JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG + " are "
9796
+ "exclusive.");
9897
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
98+
String schemaPattern = config.getString(JdbcSourceConnectorConfig.SCHEMA_PATTERN_CONFIG);
9999
if (!query.isEmpty()) {
100100
if (whitelistSet != null || blacklistSet != null)
101101
throw new ConnectException(JdbcSourceConnectorConfig.QUERY_CONFIG + " may not be combined"
@@ -104,7 +104,7 @@ public void start(Map<String, String> properties) throws ConnectException {
104104
// query.
105105
whitelistSet = Collections.emptySet();
106106
}
107-
tableMonitorThread = new TableMonitorThread(db, context, tablePollMs, whitelistSet,
107+
tableMonitorThread = new TableMonitorThread(db, schemaPattern, context, tablePollMs, whitelistSet,
108108
blacklistSet, tableTypesSet);
109109
tableMonitorThread.start();
110110
}

src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.confluent.connect.jdbc.source;
1818

19+
import io.confluent.connect.jdbc.util.JdbcUtils;
1920
import org.apache.kafka.connect.data.Struct;
2021
import org.apache.kafka.connect.errors.ConnectException;
2122
import org.apache.kafka.connect.source.SourceRecord;
@@ -28,16 +29,14 @@
2829
import java.util.Collections;
2930
import java.util.Map;
3031

31-
import io.confluent.connect.jdbc.util.JdbcUtils;
32-
3332
/**
3433
* BulkTableQuerier always returns the entire table.
3534
*/
3635
public class BulkTableQuerier extends TableQuerier {
3736
private static final Logger log = LoggerFactory.getLogger(BulkTableQuerier.class);
3837

39-
public BulkTableQuerier(QueryMode mode, String name, String topicPrefix) {
40-
super(mode, name, topicPrefix);
38+
public BulkTableQuerier(QueryMode mode, String name, String schemaPattern, String topicPrefix) {
39+
super(mode, name, topicPrefix, schemaPattern);
4140
}
4241

4342
@Override

src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.confluent.connect.jdbc.source;
1818

19+
import io.confluent.connect.jdbc.util.JdbcUtils;
1920
import org.apache.kafka.common.config.AbstractConfig;
2021
import org.apache.kafka.common.config.ConfigDef;
2122
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -32,8 +33,6 @@
3233
import java.util.List;
3334
import java.util.Map;
3435

35-
import io.confluent.connect.jdbc.util.JdbcUtils;
36-
3736
public class JdbcSourceConnectorConfig extends AbstractConfig {
3837

3938
public static final String CONNECTION_URL_CONFIG = "connection.url";
@@ -144,6 +143,9 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
144143
public static final long TIMESTAMP_DELAY_INTERVAL_MS_DEFAULT = 0;
145144
private static final String TIMESTAMP_DELAY_INTERVAL_MS_DISPLAY = "Delay Interval (ms)";
146145

146+
public static final String SCHEMA_PATTERN_CONFIG = "schemaPattern";
147+
private static final String SCHEMA_PATTERN_DOC = "Schema pattern to fetch tables metadata";
148+
147149
public static final String DATABASE_GROUP = "Database";
148150
public static final String MODE_GROUP = "Mode";
149151
public static final String CONNECTOR_GROUP = "Connector";
@@ -190,6 +192,7 @@ public static ConfigDef baseConfigDef() {
190192
.define(BATCH_MAX_ROWS_CONFIG, Type.INT, BATCH_MAX_ROWS_DEFAULT, Importance.LOW, BATCH_MAX_ROWS_DOC, CONNECTOR_GROUP, 2, Width.SHORT, BATCH_MAX_ROWS_DISPLAY)
191193
.define(TABLE_POLL_INTERVAL_MS_CONFIG, Type.LONG, TABLE_POLL_INTERVAL_MS_DEFAULT, Importance.LOW, TABLE_POLL_INTERVAL_MS_DOC, CONNECTOR_GROUP, 3, Width.SHORT, TABLE_POLL_INTERVAL_MS_DISPLAY)
192194
.define(TOPIC_PREFIX_CONFIG, Type.STRING, Importance.HIGH, TOPIC_PREFIX_DOC, CONNECTOR_GROUP, 4, Width.MEDIUM, TOPIC_PREFIX_DISPLAY)
195+
.define(SCHEMA_PATTERN_CONFIG, Type.STRING, null, Importance.MEDIUM, SCHEMA_PATTERN_DOC, DATABASE_GROUP, 5, Width.SHORT, QUERY_DISPLAY)
193196
.define(TIMESTAMP_DELAY_INTERVAL_MS_CONFIG, Type.LONG, TIMESTAMP_DELAY_INTERVAL_MS_DEFAULT, Importance.HIGH, TIMESTAMP_DELAY_INTERVAL_MS_DOC, CONNECTOR_GROUP, 5, Width.MEDIUM, TIMESTAMP_DELAY_INTERVAL_MS_DISPLAY);
194197
}
195198

@@ -207,13 +210,14 @@ private static class TableRecommender implements Recommender {
207210
@Override
208211
public List<Object> validValues(String name, Map<String, Object> config) {
209212
String dbUrl = (String) config.get(CONNECTION_URL_CONFIG);
213+
String schemaPattern = (String) config.get(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
210214
if (dbUrl == null) {
211215
throw new ConfigException(CONNECTION_URL_CONFIG + " cannot be null.");
212216
}
213217
Connection db;
214218
try {
215219
db = DriverManager.getConnection(dbUrl);
216-
return new LinkedList<Object>(JdbcUtils.getTables(db));
220+
return new LinkedList<Object>(JdbcUtils.getTables(db, schemaPattern));
217221
} catch (SQLException e) {
218222
throw new ConfigException("Couldn't open connection to " + dbUrl, e);
219223
}

src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.confluent.connect.jdbc.source;
1818

19+
import io.confluent.connect.jdbc.util.JdbcUtils;
20+
import io.confluent.connect.jdbc.util.Version;
1921
import org.apache.kafka.common.config.ConfigException;
2022
import org.apache.kafka.common.utils.SystemTime;
2123
import org.apache.kafka.common.utils.Time;
@@ -35,9 +37,6 @@
3537
import java.util.PriorityQueue;
3638
import java.util.concurrent.atomic.AtomicBoolean;
3739

38-
import io.confluent.connect.jdbc.util.JdbcUtils;
39-
import io.confluent.connect.jdbc.util.Version;
40-
4140
/**
4241
* JdbcSourceTask is a Kafka Connect SourceTask implementation that reads from JDBC databases and
4342
* generates Kafka Connect records.
@@ -49,6 +48,7 @@ public class JdbcSourceTask extends SourceTask {
4948
private Time time;
5049
private JdbcSourceTaskConfig config;
5150
private Connection db;
51+
private String schemaPattern;
5252
private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
5353
private AtomicBoolean stop;
5454

@@ -73,6 +73,7 @@ public void start(Map<String, String> properties) {
7373
throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
7474
}
7575

76+
this.schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
7677
List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
7778
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
7879
if ((tables.isEmpty() && query.isEmpty()) || (!tables.isEmpty() && !query.isEmpty())) {
@@ -117,6 +118,8 @@ public void start(Map<String, String> properties) {
117118
throw new ConnectException(e);
118119
}
119120

121+
String schemaPattern
122+
= config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
120123
String incrementingColumn
121124
= config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG);
122125
String timestampColumn
@@ -148,16 +151,16 @@ public void start(Map<String, String> properties) {
148151
String topicPrefix = config.getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG);
149152

150153
if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
151-
tableQueue.add(new BulkTableQuerier(queryMode, tableOrQuery, topicPrefix));
154+
tableQueue.add(new BulkTableQuerier(queryMode, tableOrQuery, schemaPattern, topicPrefix));
152155
} else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) {
153156
tableQueue.add(new TimestampIncrementingTableQuerier(
154-
queryMode, tableOrQuery, topicPrefix, null, incrementingColumn, offset, timestampDelayInterval));
157+
queryMode, tableOrQuery, topicPrefix, null, incrementingColumn, offset, timestampDelayInterval, schemaPattern));
155158
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
156159
tableQueue.add(new TimestampIncrementingTableQuerier(
157-
queryMode, tableOrQuery, topicPrefix, timestampColumn, null, offset, timestampDelayInterval));
160+
queryMode, tableOrQuery, topicPrefix, timestampColumn, null, offset, timestampDelayInterval, schemaPattern));
158161
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
159162
tableQueue.add(new TimestampIncrementingTableQuerier(
160-
queryMode, tableOrQuery, topicPrefix, timestampColumn, incrementingColumn, offset, timestampDelayInterval));
163+
queryMode, tableOrQuery, topicPrefix, timestampColumn, incrementingColumn, offset, timestampDelayInterval, schemaPattern));
161164
}
162165
}
163166

@@ -255,14 +258,14 @@ private void validateNonNullable(String incrementalMode, String table, String in
255258
// without a query or parsing the query since we don't have a table name.
256259
if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) ||
257260
incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) &&
258-
JdbcUtils.isColumnNullable(db, table, incrementingColumn)) {
261+
JdbcUtils.isColumnNullable(db, schemaPattern, table, incrementingColumn)) {
259262
throw new ConnectException("Cannot make incremental queries using incrementing column " +
260263
incrementingColumn + " on " + table + " because this column is "
261264
+ "nullable.");
262265
}
263266
if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) ||
264267
incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) &&
265-
JdbcUtils.isColumnNullable(db, table, timestampColumn)) {
268+
JdbcUtils.isColumnNullable(db, schemaPattern, table, timestampColumn)) {
266269
throw new ConnectException("Cannot make incremental queries using timestamp column " +
267270
timestampColumn + " on " + table + " because this column is "
268271
+ "nullable.");

src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.confluent.connect.jdbc.source;
1818

19+
import io.confluent.connect.jdbc.util.JdbcUtils;
1920
import org.apache.kafka.connect.connector.ConnectorContext;
2021
import org.apache.kafka.connect.errors.ConnectException;
2122
import org.slf4j.Logger;
@@ -30,8 +31,6 @@
3031
import java.util.concurrent.CountDownLatch;
3132
import java.util.concurrent.TimeUnit;
3233

33-
import io.confluent.connect.jdbc.util.JdbcUtils;
34-
3534
/**
3635
* Thread that monitors the database for changes to the set of tables in the database that this
3736
* connector should load data from.
@@ -40,6 +39,7 @@ public class TableMonitorThread extends Thread {
4039
private static final Logger log = LoggerFactory.getLogger(TableMonitorThread.class);
4140

4241
private final Connection db;
42+
private final String schemaPattern;
4343
private final ConnectorContext context;
4444
private final CountDownLatch shutdownLatch;
4545
private final long pollMs;
@@ -48,9 +48,10 @@ public class TableMonitorThread extends Thread {
4848
private List<String> tables;
4949
private Set<String> tableTypes;
5050

51-
public TableMonitorThread(Connection db, ConnectorContext context, long pollMs,
51+
public TableMonitorThread(Connection db, String schemaPattern, ConnectorContext context, long pollMs,
5252
Set<String> whitelist, Set<String> blacklist, Set<String> tableTypes) {
5353
this.db = db;
54+
this.schemaPattern = schemaPattern;
5455
this.context = context;
5556
this.shutdownLatch = new CountDownLatch(1);
5657
this.pollMs = pollMs;
@@ -108,7 +109,7 @@ private boolean updateTables() {
108109
synchronized (db) {
109110
final List<String> tables;
110111
try {
111-
tables = JdbcUtils.getTables(db, tableTypes);
112+
tables = JdbcUtils.getTables(db, schemaPattern, tableTypes);
112113
log.debug("Got the following tables: " + Arrays.toString(tables.toArray()));
113114
} catch (SQLException e) {
114115
log.error("Error while trying to get updated table list, ignoring and waiting for next "

src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public enum QueryMode {
3636
}
3737

3838
protected final QueryMode mode;
39+
protected String schemaPattern;
3940
protected final String name;
4041
protected final String query;
4142
protected final String topicPrefix;
@@ -44,8 +45,9 @@ public enum QueryMode {
4445
protected ResultSet resultSet;
4546
protected Schema schema;
4647

47-
public TableQuerier(QueryMode mode, String nameOrQuery, String topicPrefix) {
48+
public TableQuerier(QueryMode mode, String nameOrQuery, String topicPrefix, String schemaPattern) {
4849
this.mode = mode;
50+
this.schemaPattern = schemaPattern;
4951
this.name = mode.equals(QueryMode.TABLE) ? nameOrQuery : null;
5052
this.query = mode.equals(QueryMode.QUERY) ? nameOrQuery : null;
5153
this.topicPrefix = topicPrefix;

src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2015 Confluent Inc.
3-
*
3+
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
* <p>
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
* <p>
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,6 +16,7 @@
1616

1717
package io.confluent.connect.jdbc.source;
1818

19+
import io.confluent.connect.jdbc.util.JdbcUtils;
1920
import org.apache.kafka.connect.data.Struct;
2021
import org.apache.kafka.connect.errors.ConnectException;
2122
import org.apache.kafka.connect.source.SourceRecord;
@@ -32,8 +33,6 @@
3233
import java.util.Map;
3334
import java.util.TimeZone;
3435

35-
import io.confluent.connect.jdbc.util.JdbcUtils;
36-
3736
/**
3837
* <p>
3938
* TimestampIncrementingTableQuerier performs incremental loading of data using two mechanisms: a
@@ -63,8 +62,8 @@ public class TimestampIncrementingTableQuerier extends TableQuerier {
6362

6463
public TimestampIncrementingTableQuerier(QueryMode mode, String name, String topicPrefix,
6564
String timestampColumn, String incrementingColumn,
66-
Map<String, Object> offsetMap, Long timestampDelay) {
67-
super(mode, name, topicPrefix);
65+
Map<String, Object> offsetMap, Long timestampDelay, String schemaPattern) {
66+
super(mode, name, topicPrefix, schemaPattern);
6867
this.timestampColumn = timestampColumn;
6968
this.incrementingColumn = incrementingColumn;
7069
this.timestampDelay = timestampDelay;
@@ -75,7 +74,7 @@ public TimestampIncrementingTableQuerier(QueryMode mode, String name, String top
7574
protected void createPreparedStatement(Connection db) throws SQLException {
7675
// Default when unspecified uses an autoincrementing column
7776
if (incrementingColumn != null && incrementingColumn.isEmpty()) {
78-
incrementingColumn = JdbcUtils.getAutoincrementColumn(db, name);
77+
incrementingColumn = JdbcUtils.getAutoincrementColumn(db, schemaPattern, name);
7978
}
8079

8180
String quoteString = JdbcUtils.getIdentifierQuoteString(db);
@@ -145,7 +144,6 @@ protected void createPreparedStatement(Connection db) throws SQLException {
145144
}
146145

147146

148-
149147
@Override
150148
protected ResultSet executeQuery() throws SQLException {
151149
if (incrementingColumn != null && timestampColumn != null) {
@@ -157,9 +155,9 @@ protected ResultSet executeQuery() throws SQLException {
157155
stmt.setLong(3, incOffset);
158156
stmt.setTimestamp(4, tsOffset, UTC_CALENDAR);
159157
log.debug("Executing prepared statement with start time value = {} end time = {} and incrementing value = {}",
160-
JdbcUtils.formatUTC(tsOffset),
161-
JdbcUtils.formatUTC(endTime),
162-
incOffset);
158+
JdbcUtils.formatUTC(tsOffset),
159+
JdbcUtils.formatUTC(endTime),
160+
incOffset);
163161
} else if (incrementingColumn != null) {
164162
Long incOffset = offset.getIncrementingOffset();
165163
stmt.setLong(1, incOffset);
@@ -170,8 +168,8 @@ protected ResultSet executeQuery() throws SQLException {
170168
stmt.setTimestamp(1, tsOffset, UTC_CALENDAR);
171169
stmt.setTimestamp(2, endTime, UTC_CALENDAR);
172170
log.debug("Executing prepared statement with timestamp value = {} end time = {}",
173-
JdbcUtils.formatUTC(tsOffset),
174-
JdbcUtils.formatUTC(endTime));
171+
JdbcUtils.formatUTC(tsOffset),
172+
JdbcUtils.formatUTC(endTime));
175173
}
176174
return stmt.executeQuery();
177175
}
@@ -191,7 +189,7 @@ public SourceRecord extractRecord() throws SQLException {
191189
break;
192190
default:
193191
throw new ConnectException("Invalid type for incrementing column: "
194-
+ schema.field(incrementingColumn).schema().type());
192+
+ schema.field(incrementingColumn).schema().type());
195193
}
196194

197195
// If we are only using an incrementing column, then this must be incrementing. If we are also
@@ -216,7 +214,7 @@ public SourceRecord extractRecord() throws SQLException {
216214
break;
217215
case QUERY:
218216
partition = Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY,
219-
JdbcSourceConnectorConstants.QUERY_NAME_VALUE);
217+
JdbcSourceConnectorConstants.QUERY_NAME_VALUE);
220218
topic = topicPrefix;
221219
break;
222220
default:
@@ -228,11 +226,11 @@ public SourceRecord extractRecord() throws SQLException {
228226
@Override
229227
public String toString() {
230228
return "TimestampIncrementingTableQuerier{" +
231-
"name='" + name + '\'' +
232-
", query='" + query + '\'' +
233-
", topicPrefix='" + topicPrefix + '\'' +
234-
", timestampColumn='" + timestampColumn + '\'' +
235-
", incrementingColumn='" + incrementingColumn + '\'' +
236-
'}';
229+
"name='" + name + '\'' +
230+
", query='" + query + '\'' +
231+
", topicPrefix='" + topicPrefix + '\'' +
232+
", timestampColumn='" + timestampColumn + '\'' +
233+
", incrementingColumn='" + incrementingColumn + '\'' +
234+
'}';
237235
}
238236
}

0 commit comments

Comments
 (0)