Skip to content

Commit c516ef2

Browse files
authored
Filter invalid Singer Messages in the worker (#236)
1 parent 49eede4 commit c516ef2

File tree

9 files changed

+296
-58
lines changed

9 files changed

+296
-58
lines changed

dataline-singer/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ dependencies {
77
}
88

99
jsonSchema2Pojo {
10+
// todo (cgardens) - this path cannot be the same as the one in dataline-configs:model,
11+
// otherwise they clobber each other, when we are accessing them as java resources.
12+
// we need to find a more graceful way of handling this issue, but for now this keeps
13+
// us moving forward.
14+
source = files("${sourceSets.main.output.resourcesDir}/singer_json")
1015
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
1116

1217
targetPackage = 'io.dataline.singer'
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Dataline
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.dataline.singer;
26+
27+
import io.dataline.commons.io.IOs;
28+
import io.dataline.commons.resources.MoreResources;
29+
import java.io.File;
30+
import java.io.IOException;
31+
import java.nio.file.Files;
32+
import java.nio.file.Path;
33+
import java.util.List;
34+
import java.util.stream.Collectors;
35+
36+
// todo (cgardens) - dedupe this with ConfigSchema.
37+
public enum SingerConfigSchema {
38+
39+
SINGER_MESSAGE("SingerMessage.json");
40+
41+
static final Path KNOWN_SCHEMAS_ROOT = prepareSchemas();
42+
private static final String RESOURCE_DIR = "singer_json";
43+
44+
/*
45+
* JsonReferenceProcessor relies on all of the json in consumes being in a file system (not in a
46+
* jar). This method copies all of the json configs out of the jar into a temporary directory so
47+
* that JsonReferenceProcessor can find them.
48+
*/
49+
@SuppressWarnings("UnstableApiUsage")
50+
private static Path prepareSchemas() {
51+
try {
52+
final List<String> filenames = MoreResources.listResources(SingerConfigSchema.class, RESOURCE_DIR)
53+
.map(p -> p.getFileName().toString())
54+
.filter(p -> p.endsWith(".json"))
55+
.collect(Collectors.toList());
56+
57+
final Path configRoot = Files.createTempDirectory("schemas");
58+
for (String filename : filenames) {
59+
IOs.writeFile(
60+
configRoot,
61+
filename,
62+
MoreResources.readResource(String.format("%s/%s", RESOURCE_DIR, filename)));
63+
}
64+
65+
return configRoot;
66+
} catch (IOException e) {
67+
throw new RuntimeException(e);
68+
}
69+
}
70+
71+
private final String schemaFilename;
72+
73+
SingerConfigSchema(final String schemaFilename) {
74+
this.schemaFilename = schemaFilename;
75+
}
76+
77+
public File getFile() {
78+
return KNOWN_SCHEMAS_ROOT.resolve(schemaFilename).toFile();
79+
}
80+
81+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Dataline
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.dataline.singer;
26+
27+
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
29+
import java.io.IOException;
30+
import java.nio.charset.StandardCharsets;
31+
import java.nio.file.Files;
32+
import org.junit.jupiter.api.Test;
33+
34+
public class SingerConfigSchemaTest {
35+
36+
@Test
37+
void testFile() throws IOException {
38+
final String schema = Files.readString(SingerConfigSchema.SINGER_MESSAGE.getFile().toPath(), StandardCharsets.UTF_8);
39+
assertTrue(schema.contains("title"));
40+
}
41+
42+
@Test
43+
void testPrepareKnownSchemas() {
44+
for (SingerConfigSchema value : SingerConfigSchema.values()) {
45+
assertTrue(Files.exists(value.getFile().toPath()));
46+
}
47+
}
48+
49+
}

dataline-workers/src/main/java/io/dataline/workers/protocol/singer/SingerJsonStreamFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,18 @@ public class SingerJsonStreamFactory implements StreamFactory {
4848

4949
private static final Logger LOGGER = LoggerFactory.getLogger(SingerJsonStreamFactory.class);
5050

51+
private final SingerProtocolPredicate singerProtocolValidator;
52+
53+
public SingerJsonStreamFactory() {
54+
this(new SingerProtocolPredicate());
55+
}
56+
57+
SingerJsonStreamFactory(SingerProtocolPredicate singerProtocolPredicate) {
58+
singerProtocolValidator = singerProtocolPredicate;
59+
}
60+
5161
public Stream<SingerMessage> create(BufferedReader bufferedReader) {
52-
return bufferedReader.lines().map(this::parseJsonOrNull).filter(Objects::nonNull);
62+
return bufferedReader.lines().filter(singerProtocolValidator).map(this::parseJsonOrNull).filter(Objects::nonNull);
5363
}
5464

5565
private SingerMessage parseJsonOrNull(String record) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Dataline
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.dataline.workers.protocol.singer;
26+
27+
import com.fasterxml.jackson.databind.JsonNode;
28+
import io.dataline.commons.json.JsonSchemaValidator;
29+
import io.dataline.commons.json.Jsons;
30+
import io.dataline.singer.SingerConfigSchema;
31+
import java.util.function.Predicate;
32+
33+
public class SingerProtocolPredicate implements Predicate<String> {
34+
35+
private final JsonSchemaValidator jsonSchemaValidator;
36+
private final JsonNode schema;
37+
38+
public SingerProtocolPredicate() {
39+
jsonSchemaValidator = new JsonSchemaValidator();
40+
schema = JsonSchemaValidator.getSchema(SingerConfigSchema.SINGER_MESSAGE.getFile());
41+
}
42+
43+
@Override
44+
public boolean test(String s) {
45+
return jsonSchemaValidator.test(schema, Jsons.deserialize(s));
46+
}
47+
48+
}

dataline-workers/src/test/java/io/dataline/workers/protocol/singer/SingerJsonStreamFactoryTest.java

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
package io.dataline.workers.protocol.singer;
2626

2727
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.mockito.ArgumentMatchers.any;
29+
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.when;
2831

2932
import io.dataline.commons.json.Jsons;
3033
import io.dataline.singer.SingerMessage;
@@ -34,6 +37,7 @@
3437
import java.io.InputStreamReader;
3538
import java.util.stream.Collectors;
3639
import java.util.stream.Stream;
40+
import org.junit.jupiter.api.BeforeEach;
3741
import org.junit.jupiter.api.Test;
3842

3943
@SuppressWarnings("StringBufferReplaceableByString")
@@ -42,6 +46,14 @@ class SingerJsonStreamFactoryTest {
4246
private static final String TABLE_NAME = "user_preferences";
4347
private static final String COLUMN_NAME = "favorite_color";
4448

49+
private SingerProtocolPredicate singerProtocolPredicate;
50+
51+
@BeforeEach
52+
public void setup() {
53+
singerProtocolPredicate = mock(SingerProtocolPredicate.class);
54+
when(singerProtocolPredicate.test(any())).thenReturn(true);
55+
}
56+
4557
@Test
4658
public void testValid() {
4759
final SingerMessage record1 =
@@ -63,55 +75,37 @@ public void testValid() {
6375
expectedStream.collect(Collectors.toList()), messageStream.collect(Collectors.toList()));
6476
}
6577

66-
// TODO these test cases are commented out because jsonschema2pojo does not generate annotations
67-
// that allow us to validate records correctly.
68-
// Until we add validation, deserializing invalid records (e.g: ones missing required fields) will
69-
// succeed, so we temporarily comment these cases out.
70-
// @Test
71-
// public void testInvalid() {
72-
// final SingerMessage record1 =
73-
// MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "green");
74-
// final SingerMessage record2 =
75-
// MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "yellow");
76-
//
77-
// final String inputString =
78-
// new StringBuilder()
79-
// .append(Jsons.serialize(record1))
80-
// .append('\n')
81-
// .append("{ \"fish\": \"tuna\"}")
82-
// .append('\n')
83-
// .append(Jsons.serialize(record2))
84-
// .toString();
85-
//
86-
// final Stream<SingerMessage> messageStream = stringToSingerMessageStream(inputString);
87-
// final Stream<SingerMessage> expectedStream = Stream.of(record1, record2);
88-
//
89-
// assertEquals(
90-
// expectedStream.collect(Collectors.toList()), messageStream.collect(Collectors.toList()));
91-
// }
92-
9378
@Test
94-
public void testMissingNewLineBetweenValidRecords() {
79+
public void testInvalid() {
80+
9581
final SingerMessage record1 =
9682
MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "green");
9783
final SingerMessage record2 =
9884
MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "yellow");
85+
final String invalidRecord = "{ \"fish\": \"tuna\"}";
86+
87+
when(singerProtocolPredicate.test(Jsons.serialize(record1))).thenReturn(true);
88+
when(singerProtocolPredicate.test(Jsons.serialize(record2))).thenReturn(true);
89+
when(singerProtocolPredicate.test(invalidRecord)).thenReturn(false);
9990

10091
final String inputString =
10192
new StringBuilder()
10293
.append(Jsons.serialize(record1))
94+
.append('\n')
95+
.append("{ \"fish\": \"tuna\"}")
96+
.append('\n')
10397
.append(Jsons.serialize(record2))
10498
.toString();
10599

106100
final Stream<SingerMessage> messageStream = stringToSingerMessageStream(inputString);
107-
final Stream<SingerMessage> expectedStream = Stream.of(record1);
101+
final Stream<SingerMessage> expectedStream = Stream.of(record1, record2);
108102

109103
assertEquals(
110104
expectedStream.collect(Collectors.toList()), messageStream.collect(Collectors.toList()));
111105
}
112106

113107
@Test
114-
public void testMissingNewLineAndLineStartsWithValidRecord() {
108+
public void testMissingNewLineBetweenValidRecords() {
115109
final SingerMessage record1 =
116110
MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "green");
117111
final SingerMessage record2 =
@@ -120,44 +114,20 @@ public void testMissingNewLineAndLineStartsWithValidRecord() {
120114
final String inputString =
121115
new StringBuilder()
122116
.append(Jsons.serialize(record1))
123-
.append("{ \"fish\": \"tuna\"}")
124-
.append('\n')
125117
.append(Jsons.serialize(record2))
126118
.toString();
127119

128120
final Stream<SingerMessage> messageStream = stringToSingerMessageStream(inputString);
129-
final Stream<SingerMessage> expectedStream = Stream.of(record1, record2);
121+
final Stream<SingerMessage> expectedStream = Stream.of(record1);
130122

131123
assertEquals(
132124
expectedStream.collect(Collectors.toList()), messageStream.collect(Collectors.toList()));
133125
}
134126

135-
// @Test
136-
// public void testMissingNewLineAndLineStartsWithInvalidRecord() {
137-
// final SingerMessage record1 =
138-
// MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "green");
139-
// final SingerMessage record2 =
140-
// MessageUtils.createRecordMessage(TABLE_NAME, COLUMN_NAME, "yellow");
141-
//
142-
// final String inputString =
143-
// new StringBuilder()
144-
// .append(Jsons.serialize(record1))
145-
// .append('\n')
146-
// .append("{ \"fish\": \"tuna\"}")
147-
// .append(Jsons.serialize(record2))
148-
// .toString();
149-
//
150-
// final Stream<SingerMessage> messageStream = stringToSingerMessageStream(inputString);
151-
// final Stream<SingerMessage> expectedStream = Stream.of(record1);
152-
//
153-
// assertEquals(
154-
// expectedStream.collect(Collectors.toList()), messageStream.collect(Collectors.toList()));
155-
// }
156-
157-
private static Stream<SingerMessage> stringToSingerMessageStream(String inputString) {
127+
private Stream<SingerMessage> stringToSingerMessageStream(String inputString) {
158128
InputStream inputStream = new ByteArrayInputStream(inputString.getBytes());
159129
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
160-
return new SingerJsonStreamFactory().create(bufferedReader);
130+
return new SingerJsonStreamFactory(singerProtocolPredicate).create(bufferedReader);
161131
}
162132

163133
}

0 commit comments

Comments
 (0)