@@ -59,22 +59,17 @@ public final String getInsert(final String tableName, final Collection<String> k
59
59
public abstract String getUpsertQuery (final String table , final Collection <String > keyColumns , final Collection <String > columns );
60
60
61
61
public String getCreateQuery (String tableName , Collection <SinkRecordField > fields ) {
62
- final List <String > pks = new ArrayList <>();
63
- for (SinkRecordField f : fields ) {
64
- if (f .isPrimaryKey ) {
65
- pks .add (f .name );
66
- }
67
- }
62
+ final List <String > pkFieldNames = extractPrimaryKeyFieldNames (fields );
68
63
final StringBuilder builder = new StringBuilder ();
69
64
builder .append ("CREATE TABLE " );
70
65
builder .append (escapeTableName (tableName ));
71
66
builder .append (" (" );
72
67
writeColumnsSpec (builder , fields );
73
- if (!pks .isEmpty ()) {
68
+ if (!pkFieldNames .isEmpty ()) {
74
69
builder .append ("," );
75
70
builder .append (lineSeparator );
76
71
builder .append ("PRIMARY KEY(" );
77
- joinToBuilder (builder , "," , pks , stringSurroundTransform (escapeColumnNamesStart , escapeColumnNamesEnd ));
72
+ joinToBuilder (builder , "," , pkFieldNames , stringSurroundTransform (escapeColumnNamesStart , escapeColumnNamesEnd ));
78
73
builder .append (")" );
79
74
}
80
75
builder .append (")" );
@@ -133,6 +128,16 @@ protected String escapeTableName(String tableName) {
133
128
return escapeColumnNamesStart + tableName + escapeColumnNamesEnd ;
134
129
}
135
130
131
+ static List <String > extractPrimaryKeyFieldNames (Collection <SinkRecordField > fields ) {
132
+ final List <String > pks = new ArrayList <>();
133
+ for (SinkRecordField f : fields ) {
134
+ if (f .isPrimaryKey ) {
135
+ pks .add (f .name );
136
+ }
137
+ }
138
+ return pks ;
139
+ }
140
+
136
141
public static DbDialect fromConnectionString (final String url ) {
137
142
if (!url .startsWith ("jdbc:" )) {
138
143
throw new ConnectException (String .format ("Not a valid JDBC URL: %s" , url ));
0 commit comments