Skip to content

Commit 4e41e8d

Browse files
amitselarhauch
authored andcommitted
KAFKA-6684: Support casting Connect values with bytes schema to string
Allow to cast LogicalType to string by calling the serialized (Java) object's toString(). Added tests for `BigDecimal` and `Date` as whole record and as fields. Author: Amit Sela <[email protected]> Reviewers: Randall Hauch <[email protected]>, Robert Yokota <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes apache#4820 from amitsela/cast-transform-bytes
1 parent a4de733 commit 4e41e8d

File tree

3 files changed

+96
-23
lines changed
  • connect
    • api/src/main/java/org/apache/kafka/connect/data
    • transforms/src
      • main/java/org/apache/kafka/connect/transforms
      • test/java/org/apache/kafka/connect/transforms

3 files changed

+96
-23
lines changed

connect/api/src/main/java/org/apache/kafka/connect/data/Values.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ protected static String escape(String value) {
707707
return value.replaceAll("\\\\", "\\\\\\\\").replaceAll("\"", "\\\\\"");
708708
}
709709

710-
protected static DateFormat dateFormatFor(java.util.Date value) {
710+
public static DateFormat dateFormatFor(java.util.Date value) {
711711
if (value.getTime() < MILLIS_PER_DAY) {
712712
return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN);
713713
}

connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.connect.data.Schema;
2929
import org.apache.kafka.connect.data.SchemaBuilder;
3030
import org.apache.kafka.connect.data.Struct;
31+
import org.apache.kafka.connect.data.Values;
3132
import org.apache.kafka.connect.errors.DataException;
3233
import org.apache.kafka.connect.transforms.util.SchemaUtil;
3334
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -81,9 +82,16 @@ public String toString() {
8182

8283
private static final String PURPOSE = "cast types";
8384

84-
private static final Set<Schema.Type> SUPPORTED_CAST_TYPES = EnumSet.of(
85+
private static final Set<Schema.Type> SUPPORTED_CAST_INPUT_TYPES = EnumSet.of(
8586
Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
86-
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, Schema.Type.STRING
87+
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
88+
Schema.Type.STRING, Schema.Type.BYTES
89+
);
90+
91+
private static final Set<Schema.Type> SUPPORTED_CAST_OUTPUT_TYPES = EnumSet.of(
92+
Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
93+
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
94+
Schema.Type.STRING
8795
);
8896

8997
// As a special case for casting the entire value (e.g. the incoming key is a int64 but you know it could be an
@@ -123,14 +131,14 @@ public void close() {
123131

124132
private R applySchemaless(R record) {
125133
if (wholeValueCastType != null) {
126-
return newRecord(record, null, castValueToType(operatingValue(record), wholeValueCastType));
134+
return newRecord(record, null, castValueToType(null, operatingValue(record), wholeValueCastType));
127135
}
128136

129137
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
130138
final HashMap<String, Object> updatedValue = new HashMap<>(value);
131139
for (Map.Entry<String, Schema.Type> fieldSpec : casts.entrySet()) {
132140
String field = fieldSpec.getKey();
133-
updatedValue.put(field, castValueToType(value.get(field), fieldSpec.getValue()));
141+
updatedValue.put(field, castValueToType(null, value.get(field), fieldSpec.getValue()));
134142
}
135143
return newRecord(record, null, updatedValue);
136144
}
@@ -141,7 +149,7 @@ private R applyWithSchema(R record) {
141149

142150
// Whole-record casting
143151
if (wholeValueCastType != null)
144-
return newRecord(record, updatedSchema, castValueToType(operatingValue(record), wholeValueCastType));
152+
return newRecord(record, updatedSchema, castValueToType(valueSchema, operatingValue(record), wholeValueCastType));
145153

146154
// Casting within a struct
147155
final Struct value = requireStruct(operatingValue(record), PURPOSE);
@@ -150,7 +158,7 @@ private R applyWithSchema(R record) {
150158
for (Field field : value.schema().fields()) {
151159
final Object origFieldValue = value.get(field);
152160
final Schema.Type targetType = casts.get(field.name());
153-
final Object newFieldValue = targetType != null ? castValueToType(origFieldValue, targetType) : origFieldValue;
161+
final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
154162
log.trace("Cast field '{}' from '{}' to '{}'", field.name(), origFieldValue, newFieldValue);
155163
updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
156164
}
@@ -172,8 +180,10 @@ private Schema getOrBuildSchema(Schema valueSchema) {
172180
SchemaBuilder fieldBuilder = convertFieldType(casts.get(field.name()));
173181
if (field.schema().isOptional())
174182
fieldBuilder.optional();
175-
if (field.schema().defaultValue() != null)
176-
fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type()));
183+
if (field.schema().defaultValue() != null) {
184+
Schema fieldSchema = field.schema();
185+
fieldBuilder.defaultValue(castValueToType(fieldSchema, fieldSchema.defaultValue(), fieldBuilder.type()));
186+
}
177187
builder.field(field.name(), fieldBuilder.build());
178188
} else {
179189
builder.field(field.name(), field.schema());
@@ -185,7 +195,7 @@ private Schema getOrBuildSchema(Schema valueSchema) {
185195
if (valueSchema.isOptional())
186196
builder.optional();
187197
if (valueSchema.defaultValue() != null)
188-
builder.defaultValue(castValueToType(valueSchema.defaultValue(), builder.type()));
198+
builder.defaultValue(castValueToType(valueSchema, valueSchema.defaultValue(), builder.type()));
189199

190200
updatedSchema = builder.build();
191201
schemaUpdateCache.put(valueSchema, updatedSchema);
@@ -216,11 +226,12 @@ private SchemaBuilder convertFieldType(Schema.Type type) {
216226

217227
}
218228

219-
private static Object castValueToType(Object value, Schema.Type targetType) {
229+
private static Object castValueToType(Schema schema, Object value, Schema.Type targetType) {
220230
try {
221231
if (value == null) return null;
222232

223-
Schema.Type inferredType = ConnectSchema.schemaType(value.getClass());
233+
Schema.Type inferredType = schema == null ? ConnectSchema.schemaType(value.getClass()) :
234+
schema.type();
224235
if (inferredType == null) {
225236
throw new DataException("Cast transformation was passed a value of type " + value.getClass()
226237
+ " which is not supported by Connect's data API");
@@ -331,7 +342,12 @@ else if (value instanceof String)
331342
}
332343

333344
private static String castToString(Object value) {
334-
return value.toString();
345+
if (value instanceof java.util.Date) {
346+
java.util.Date dateValue = (java.util.Date) value;
347+
return Values.dateFormatFor(dateValue).format(dateValue);
348+
} else {
349+
return value.toString();
350+
}
335351
}
336352

337353
protected abstract Schema operatingSchema(R record);
@@ -374,15 +390,19 @@ private enum FieldType {
374390
}
375391

376392
private static Schema.Type validCastType(Schema.Type type, FieldType fieldType) {
377-
if (!SUPPORTED_CAST_TYPES.contains(type)) {
378-
String message = "Cast transformation does not support casting to/from " + type
379-
+ "; supported types are " + SUPPORTED_CAST_TYPES;
380-
switch (fieldType) {
381-
case INPUT:
382-
throw new DataException(message);
383-
case OUTPUT:
384-
throw new ConfigException(message);
385-
}
393+
switch (fieldType) {
394+
case INPUT:
395+
if (!SUPPORTED_CAST_INPUT_TYPES.contains(type)) {
396+
throw new DataException("Cast transformation does not support casting from " +
397+
type + "; supported types are " + SUPPORTED_CAST_INPUT_TYPES);
398+
}
399+
break;
400+
case OUTPUT:
401+
if (!SUPPORTED_CAST_OUTPUT_TYPES.contains(type)) {
402+
throw new ConfigException("Cast transformation does not support casting to " +
403+
type + "; supported types are " + SUPPORTED_CAST_OUTPUT_TYPES);
404+
}
405+
break;
386406
}
387407
return type;
388408
}

connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
package org.apache.kafka.connect.transforms;
1919

2020
import org.apache.kafka.common.config.ConfigException;
21+
import org.apache.kafka.connect.data.Decimal;
2122
import org.apache.kafka.connect.data.Schema;
2223
import org.apache.kafka.connect.data.SchemaBuilder;
2324
import org.apache.kafka.connect.data.Struct;
2425
import org.apache.kafka.connect.data.Timestamp;
26+
import org.apache.kafka.connect.data.Values;
2527
import org.apache.kafka.connect.errors.DataException;
2628
import org.apache.kafka.connect.source.SourceRecord;
2729
import org.junit.After;
2830
import org.junit.Test;
2931

32+
import java.math.BigDecimal;
3033
import java.util.Collections;
3134
import java.util.Date;
3235
import java.util.HashMap;
@@ -39,6 +42,7 @@
3942
public class CastTest {
4043
private final Cast<SourceRecord> xformKey = new Cast.Key<>();
4144
private final Cast<SourceRecord> xformValue = new Cast.Value<>();
45+
private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
4246

4347
@After
4448
public void teardown() {
@@ -61,6 +65,11 @@ public void testConfigInvalidTargetType() {
6165
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
6266
}
6367

68+
@Test(expected = ConfigException.class)
69+
public void testUnsupportedTargetType() {
70+
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes"));
71+
}
72+
6473
@Test(expected = ConfigException.class)
6574
public void testConfigInvalidMap() {
6675
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
@@ -171,6 +180,28 @@ public void castWholeRecordValueWithSchemaString() {
171180
assertEquals("42", transformed.value());
172181
}
173182

183+
@Test
184+
public void castWholeBigDecimalRecordValueWithSchemaString() {
185+
BigDecimal bigDecimal = new BigDecimal(42);
186+
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
187+
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
188+
Decimal.schema(bigDecimal.scale()), bigDecimal));
189+
190+
assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
191+
assertEquals("42", transformed.value());
192+
}
193+
194+
@Test
195+
public void castWholeDateRecordValueWithSchemaString() {
196+
Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a timestamp formatting.
197+
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
198+
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
199+
Timestamp.SCHEMA, timestamp));
200+
201+
assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
202+
assertEquals(Values.dateFormatFor(timestamp).format(timestamp), transformed.value());
203+
}
204+
174205
@Test
175206
public void castWholeRecordDefaultValue() {
176207
// Validate default value in schema is correctly converted
@@ -292,7 +323,8 @@ public void castWholeRecordValueSchemalessUnsupportedType() {
292323

293324
@Test
294325
public void castFieldsWithSchema() {
295-
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
326+
Date day = new Date(MILLIS_PER_DAY);
327+
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32"));
296328

297329
// Include an optional fields and fields with defaults to validate their values are passed through properly
298330
SchemaBuilder builder = SchemaBuilder.struct();
@@ -305,6 +337,8 @@ public void castFieldsWithSchema() {
305337
builder.field("float64", SchemaBuilder.float64().defaultValue(-1.125).build());
306338
builder.field("boolean", Schema.BOOLEAN_SCHEMA);
307339
builder.field("string", Schema.STRING_SCHEMA);
340+
builder.field("bigdecimal", Decimal.schema(new BigDecimal(42).scale()));
341+
builder.field("date", Timestamp.SCHEMA);
308342
builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
309343
builder.field("timestamp", Timestamp.SCHEMA);
310344
Schema supportedTypesSchema = builder.build();
@@ -317,6 +351,8 @@ public void castFieldsWithSchema() {
317351
recordValue.put("float32", 32.f);
318352
recordValue.put("float64", -64.);
319353
recordValue.put("boolean", true);
354+
recordValue.put("bigdecimal", new BigDecimal(42));
355+
recordValue.put("date", day);
320356
recordValue.put("string", "42");
321357
recordValue.put("timestamp", new Date(0));
322358
// optional field intentionally omitted
@@ -335,8 +371,25 @@ public void castFieldsWithSchema() {
335371
assertEquals(true, ((Struct) transformed.value()).schema().field("float64").schema().defaultValue());
336372
assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean"));
337373
assertEquals(42, ((Struct) transformed.value()).get("string"));
374+
assertEquals("42", ((Struct) transformed.value()).get("bigdecimal"));
375+
assertEquals(Values.dateFormatFor(day).format(day), ((Struct) transformed.value()).get("date"));
338376
assertEquals(new Date(0), ((Struct) transformed.value()).get("timestamp"));
339377
assertNull(((Struct) transformed.value()).get("optional"));
378+
379+
Schema transformedSchema = ((Struct) transformed.value()).schema();
380+
assertEquals(Schema.INT16_SCHEMA.type(), transformedSchema.field("int8").schema().type());
381+
assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("int16").schema().type());
382+
assertEquals(Schema.INT64_SCHEMA.type(), transformedSchema.field("int32").schema().type());
383+
assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("int64").schema().type());
384+
assertEquals(Schema.FLOAT64_SCHEMA.type(), transformedSchema.field("float32").schema().type());
385+
assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("float64").schema().type());
386+
assertEquals(Schema.INT8_SCHEMA.type(), transformedSchema.field("boolean").schema().type());
387+
assertEquals(Schema.INT32_SCHEMA.type(), transformedSchema.field("string").schema().type());
388+
assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("bigdecimal").schema().type());
389+
assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("date").schema().type());
390+
assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("optional").schema().type());
391+
// The following fields are not changed
392+
assertEquals(Timestamp.SCHEMA.type(), transformedSchema.field("timestamp").schema().type());
340393
}
341394

342395
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)