Skip to content

Commit a0cfbd2

Browse files
committed
Source - add default values and Fix logic for prefix/whitelist/blacklist
Fixes shikhar#4
1 parent df08a9b commit a0cfbd2

File tree

3 files changed

+20
-11
lines changed

3 files changed

+20
-11
lines changed

README.rst

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,20 +186,24 @@ Configuration options
186186
* Default: "${table}"
187187
* Importance: high
188188

189+
``tables.prefix``
190+
Prefix for DynamoDB tables to source from.
191+
192+
* Type: string
193+
* Default: ""
194+
* Importance: medium
195+
189196
``tables.whitelist``
190197
Whitelist for DynamoDB tables to source from.
191198

192199
* Type: list
200+
* Default: ""
193201
* Importance: medium
194202

195203
``tables.blacklist``
196204
Blacklist for DynamoDB tables to source from.
197205

198206
* Type: list
207+
* Default: ""
199208
* Importance: medium
200209

201-
``tables.prefix``
202-
Prefix for DynamoDB tables to source from.
203-
204-
* Type: string
205-
* Importance: medium

src/main/java/dynamok/source/ConnectorConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.config.ConfigException;
2323

2424
import java.util.Arrays;
25+
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.Map;
2728

@@ -42,11 +43,11 @@ private enum Keys {
4243
throw new ConfigException("Invalid AWS region: " + regionName);
4344
}
4445
}, ConfigDef.Importance.HIGH, "AWS region for the source DynamoDB.")
45-
.define(Keys.TABLES_PREFIX, ConfigDef.Type.STRING, null,
46+
.define(Keys.TABLES_PREFIX, ConfigDef.Type.STRING, "",
4647
ConfigDef.Importance.MEDIUM, "Prefix for DynamoDB tables to source from.")
47-
.define(Keys.TABLES_WHITELIST, ConfigDef.Type.LIST, null,
48+
.define(Keys.TABLES_WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(),
4849
ConfigDef.Importance.MEDIUM, "Whitelist for DynamoDB tables to source from.")
49-
.define(Keys.TABLES_BLACKLIST, ConfigDef.Type.LIST, null,
50+
.define(Keys.TABLES_BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(),
5051
ConfigDef.Importance.MEDIUM, "Blacklist for DynamoDB tables to source from.")
5152
.define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${table}",
5253
ConfigDef.Importance.HIGH, "Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name.");

src/main/java/dynamok/source/DynamoDbSourceConnector.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ public void start(Map<String, String> props) {
8888
final ListTablesResult listResult = client.listTables(lastEvaluatedTableName);
8989

9090
for (String tableName : listResult.getTableNames()) {
91-
if ((config.tablesPrefix == null || tableName.startsWith(config.tablesPrefix)) &&
92-
(config.tablesBlacklist == null || !config.tablesBlacklist.contains(tableName)) &&
93-
(config.tablesWhitelist == null || config.tablesWhitelist.contains(tableName))) {
91+
if (!acceptTable(tableName)) {
9492
ignoredTables.add(tableName);
9593
continue;
9694
}
@@ -126,6 +124,12 @@ public void start(Map<String, String> props) {
126124
streamsClient.shutdown();
127125
}
128126

127+
private boolean acceptTable(String tableName) {
128+
return tableName.startsWith(config.tablesPrefix)
129+
&& (config.tablesWhitelist.isEmpty() || config.tablesWhitelist.contains(tableName))
130+
&& !config.tablesBlacklist.contains(tableName);
131+
}
132+
129133
@Override
130134
public void stop() {
131135
}

0 commit comments

Comments
 (0)