Skip to content

Commit 52f8d76

Browse files
WIP - add support Number without scale
1 parent 82b6282 commit 52f8d76

File tree

4 files changed

+135
-3
lines changed

4 files changed

+135
-3
lines changed

pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,20 @@
2727

2828
<artifactId>kafka-connect-cdc-xstream</artifactId>
2929

30+
<version>0.0.2-SNAPSHOT</version>
3031
<packaging>jar</packaging>
3132

3233
<dependencies>
3334
<dependency>
3435
<groupId>com.github.jcustenborder.kafka.connect</groupId>
3536
<artifactId>kafka-connect-cdc</artifactId>
36-
<version>${project.version}</version>
37+
<version>${parent.version}</version>
3738
</dependency>
3839

3940
<dependency>
4041
<groupId>com.github.jcustenborder.kafka.connect</groupId>
4142
<artifactId>kafka-connect-cdc-test</artifactId>
42-
<version>${project.version}</version>
43+
<version>${parent.version}</version>
4344
<scope>test</scope>
4445
</dependency>
4546
<dependency>

src/main/java/com/github/jcustenborder/kafka/connect/cdc/xstream/Oracle12cTableMetadataProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ Schema generateSchema(ResultSet resultSet, final String columnName) throws SQLEx
238238
SchemaBuilder builder = null;
239239

240240
String dataType = resultSet.getString(2);
241+
String scaleString = resultSet.getString(3);
241242
int scale = resultSet.getInt(3);
242243
boolean nullable = "Y".equalsIgnoreCase(resultSet.getString(4));
243244
String comments = resultSet.getString(5);
@@ -246,7 +247,7 @@ Schema generateSchema(ResultSet resultSet, final String columnName) throws SQLEx
246247
Schema.Type type = TYPE_LOOKUP.get(dataType);
247248
builder = SchemaBuilder.type(type);
248249
} else if ("NUMBER".equals(dataType)) {
249-
builder = Decimal.builder(scale);
250+
builder = scaleString != null ? Decimal.builder(scale) : SchemaBuilder.float64();
250251
} else if (matches(TIMESTAMP_PATTERN, dataType)) {
251252
builder = Timestamp.builder();
252253
} else if (matches(TIMESTAMP_WITH_LOCAL_TIMEZONE, dataType)) {

src/main/java/com/github/jcustenborder/kafka/connect/cdc/xstream/OracleChange.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.slf4j.LoggerFactory;
3838

3939
import javax.sql.PooledConnection;
40+
import java.math.BigDecimal;
4041
import java.sql.SQLException;
4142
import java.util.ArrayList;
4243
import java.util.Calendar;
@@ -276,6 +277,11 @@ public OracleChange build(RowLCR row) throws StreamsException, SQLException {
276277
throw new DataException(message, ex);
277278
}
278279

280+
//workaround for Number without scale
281+
if (Schema.Type.FLOAT64.equals(schema.type()) && value instanceof BigDecimal) {
282+
value = ((BigDecimal) value).doubleValue();
283+
}
284+
279285
ColumnValue outputColumnValue = new OracleColumnValue(
280286
columnValue.getColumnName(),
281287
schema,

src/test/java/com/github/jcustenborder/kafka/connect/cdc/xstream/GemdTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,48 @@
11
package com.github.jcustenborder.kafka.connect.cdc.xstream;
22

33
import com.github.jcustenborder.kafka.connect.cdc.Change;
4+
import com.google.common.base.Strings;
45
import com.google.common.collect.ImmutableMap;
6+
import org.apache.kafka.connect.data.Decimal;
57
import org.apache.kafka.connect.data.Schema;
68
import org.apache.kafka.connect.data.SchemaBuilder;
9+
import org.apache.kafka.connect.data.Timestamp;
10+
import org.apache.kafka.connect.errors.DataException;
11+
import org.apache.kafka.connect.storage.OffsetStorageReader;
712
import org.junit.Test;
13+
import org.junit.Assert;
14+
15+
import java.math.BigDecimal;
16+
import java.sql.*;
17+
import java.util.Collection;
18+
import java.util.HashMap;
19+
import java.util.LinkedHashMap;
20+
import java.util.Map;
21+
import java.util.regex.Pattern;
22+
23+
import static com.github.jcustenborder.kafka.connect.cdc.xstream.Oracle12cTableMetadataProvider.matches;
824

925
public class GemdTest {
26+
static final Map<String, Schema.Type> TYPE_LOOKUP;
27+
final static Pattern TIMESTAMP_PATTERN = Pattern.compile("^TIMESTAMP\\(\\d\\)$");
28+
final static Pattern TIMESTAMP_WITH_LOCAL_TIMEZONE = Pattern.compile("^TIMESTAMP\\(\\d\\) WITH LOCAL TIME ZONE$");
29+
final static Pattern TIMESTAMP_WITH_TIMEZONE = Pattern.compile("^TIMESTAMP\\(\\d\\) WITH TIME ZONE$");
30+
31+
static {
32+
Map<String, Schema.Type> map = new HashMap<>();
33+
map.put("BINARY_DOUBLE", Schema.Type.FLOAT64);
34+
map.put("BINARY_FLOAT", Schema.Type.FLOAT32);
35+
map.put("BLOB", Schema.Type.BYTES);
36+
map.put("CHAR", Schema.Type.STRING);
37+
map.put("NCHAR", Schema.Type.STRING);
38+
map.put("CLOB", Schema.Type.STRING);
39+
map.put("NCLOB", Schema.Type.STRING);
40+
map.put("NVARCHAR2", Schema.Type.STRING);
41+
map.put("VARCHAR2", Schema.Type.STRING);
42+
map.put("NVARCHAR", Schema.Type.STRING);
43+
map.put("VARCHAR", Schema.Type.STRING);
44+
TYPE_LOOKUP = ImmutableMap.copyOf(map);
45+
}
1046
@Test
1147
public void printQuery() {
1248
System.out.println(Oracle12cTableMetadataProvider.PRIMARY_KEY_SQL);
@@ -25,4 +61,92 @@ public void testBuildSchema() {
2561
.build();
2662
System.out.println(schema.parameters().isEmpty());
2763
}
64+
65+
@Test
66+
public void testBigDecimal() {
67+
String number = "290.88252314814815";
68+
String seq = "5125828185";
69+
Object bd = BigDecimal.valueOf(Double.valueOf(number));
70+
Object bd1 = BigDecimal.valueOf(Double.valueOf(seq));
71+
Object converted = Float.valueOf(((BigDecimal) bd).floatValue());
72+
Object double1 = Double.valueOf(((BigDecimal) bd).doubleValue());
73+
Object double2 = Double.valueOf(((BigDecimal) bd1).doubleValue());
74+
Assert.assertTrue(converted instanceof Float);
75+
}
76+
77+
@Test
78+
public void testFHOPEPSMetadata() throws ClassNotFoundException, SQLException {
79+
Connection conn = null;
80+
Statement stmt = null;
81+
Class.forName("oracle.jdbc.driver.OracleDriver");
82+
conn = DriverManager
83+
.getConnection("jdbc:oracle:thin:@//fc8racps1n4:1521/f8modsp1.gfoundries.com", "xstrmadmin", "xtra");
84+
85+
try (PreparedStatement columnStatement = conn.prepareStatement(Oracle12cTableMetadataProvider.COLUMN_SQL)) {
86+
columnStatement.setString(1, "MDS_ADMIN");
87+
columnStatement.setString(2, "T_HISTORY_VIEW_WAFER");
88+
89+
Map<String, Schema> columnSchemas = new LinkedHashMap<>();
90+
91+
try (ResultSet resultSet = columnStatement.executeQuery()) {
92+
while (resultSet.next()) {
93+
String columnName = resultSet.getString(1);
94+
95+
try {
96+
Schema columnSchema = generateSchema(resultSet, columnName);
97+
columnSchemas.put(columnName, columnSchema);
98+
} catch (Exception ex) {
99+
throw new DataException("Exception thrown while ", ex);
100+
}
101+
}
102+
}
103+
104+
columnSchemas.forEach((k,v) -> System.out.println(String.format("Key: %s; Value: %s", k, v)) );
105+
}
106+
107+
108+
}
109+
110+
Schema generateSchema(ResultSet resultSet, final String columnName) throws SQLException {
111+
SchemaBuilder builder = null;
112+
113+
String dataType = resultSet.getString(2);
114+
String scaleString = resultSet.getString(3);
115+
int scale = resultSet.getInt(3);
116+
boolean nullable = "Y".equalsIgnoreCase(resultSet.getString(4));
117+
String comments = resultSet.getString(5);
118+
119+
if (TYPE_LOOKUP.containsKey(dataType)) {
120+
Schema.Type type = TYPE_LOOKUP.get(dataType);
121+
builder = SchemaBuilder.type(type);
122+
} else if ("NUMBER".equals(dataType)) {
123+
builder = scaleString != null ? Decimal.builder(scale) : SchemaBuilder.float64();
124+
} else if (matches(TIMESTAMP_PATTERN, dataType)) {
125+
builder = org.apache.kafka.connect.data.Timestamp.builder();
126+
} else if (matches(TIMESTAMP_WITH_LOCAL_TIMEZONE, dataType)) {
127+
builder = org.apache.kafka.connect.data.Timestamp.builder();
128+
} else if (matches(TIMESTAMP_WITH_TIMEZONE, dataType)) {
129+
builder = org.apache.kafka.connect.data.Timestamp.builder();
130+
} else if ("DATE".equals(dataType)) {
131+
builder = Timestamp.builder();
132+
} else {
133+
String message = String.format("Could not determine schema type for column %s. dataType = %s", columnName, dataType);
134+
throw new DataException(message);
135+
}
136+
137+
138+
if (nullable) {
139+
builder.optional();
140+
}
141+
142+
if (!Strings.isNullOrEmpty(comments)) {
143+
builder.doc(comments);
144+
}
145+
146+
builder.parameters(
147+
ImmutableMap.of(Change.ColumnValue.COLUMN_NAME, columnName)
148+
);
149+
150+
return builder.build();
151+
}
28152
}

0 commit comments

Comments
 (0)