Skip to content

Commit a345af5

Browse files
committed
Revert "refactored linker (#1159)"
This reverts commit f87897b.
1 parent f3d8abf commit a345af5

File tree

10 files changed

+56
-52
lines changed

10 files changed

+56
-52
lines changed

common/core/src/main/java/zingg/common/core/match/output/LinkOutputBuilder.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import zingg.common.client.FieldDefinition;
1010
import zingg.common.client.IArguments;
1111
import zingg.common.client.ZFrame;
12+
import zingg.common.client.ZinggClientException;
1213
import zingg.common.client.util.ColName;
1314
import zingg.common.client.util.DSUtil;
1415

@@ -21,10 +22,12 @@ public LinkOutputBuilder(DSUtil<S, D, R, C> dsUtil, IArguments args) {
2122
}
2223

2324
@Override
24-
public ZFrame<D,R,C> getOutput(ZFrame<D, R, C> sampleOriginal, ZFrame<D, R, C> dupesActual) {
25+
public ZFrame<D,R,C> getOutput(ZFrame<D, R, C> sampleOriginal, ZFrame<D, R, C> dupesActual)
26+
throws ZinggClientException, Exception{
2527
dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL));
2628
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN);
2729
ZFrame<D,R,C>dupes2 = alignLinked(dupesActual, args);
30+
dupes2 = postprocessLinked(dupes2, sampleOriginal);
2831
LOG.debug("uncertain output schema is " + dupes2.showSchema());
2932
return dupes2;
3033

@@ -65,4 +68,24 @@ public ZFrame<D, R, C> alignLinked(ZFrame<D, R, C> dupesActual, IArguments args
6568
return dupes1;
6669
}
6770

71+
public ZFrame<D,R,C> getSelectedCols(ZFrame<D,R,C> actual){
72+
List<C> cols = new ArrayList<C>();
73+
cols.add(actual.col(ColName.CLUSTER_COLUMN));
74+
cols.add(actual.col(ColName.ID_COL));
75+
cols.add(actual.col(ColName.SCORE_COL));
76+
cols.add(actual.col(ColName.SOURCE_COL));
77+
78+
ZFrame<D,R,C> zFieldsFromActual = actual.select(cols);
79+
return zFieldsFromActual;
80+
81+
}
82+
83+
public ZFrame<D,R,C> postprocessLinked(ZFrame<D,R,C> actual, ZFrame<D,R,C> orig) {
84+
ZFrame<D,R,C> zFieldsFromActual = getSelectedCols(actual);
85+
ZFrame<D,R,C> joined = zFieldsFromActual.join(orig,ColName.ID_COL,ColName.SOURCE_COL)
86+
.drop(zFieldsFromActual.col(ColName.SOURCE_COL))
87+
.drop(ColName.ID_COL);
88+
89+
return joined;
90+
}
6891
}

common/core/src/test/java/zingg/common/core/executor/TestExecutorsGeneric.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public void init(S s) throws ZinggClientException, IOException {
3535

3636
public abstract List<ExecutorTester<S, D, R, C, T>> getExecutors() throws ZinggClientException, IOException;
3737

38+
//public abstract void tearDown();
3839

3940
@Test
4041
public void testExecutors() throws ZinggClientException, IOException {

common/core/src/test/java/zingg/common/core/executor/validate/LinkerValidator.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
55

6-
import org.junit.jupiter.api.Assertions;
76
import zingg.common.client.ZFrame;
87
import zingg.common.client.ZinggClientException;
98
import zingg.common.core.executor.Matcher;
@@ -23,31 +22,26 @@ public void validateResults() throws ZinggClientException {
2322

2423
@Override
2524
protected void assessAccuracy() throws ZinggClientException {
26-
ZFrame<D, R, C> linkOutput = getOutputData();
27-
28-
Assertions.assertEquals(11, linkOutput.count());
29-
/*
30-
candidate blake will be linked to one source record -> candidate + one source -> 2 records in cluster1
31-
candidate thomas will be linked to one source record -> candidate + one source -> 2 records in cluster2
32-
candidate jackson will be linked to two source records -> candidate + two source -> 3 records in cluster3
33-
candidate gianni 1st will be linked to one source record -> candidate + one source -> 2 records in cluster4
34-
candidate gianni 2nd will be linked to one source record -> candidate + one source -> 2 records in cluster5
35-
candidate takeisha has no source record
36-
37-
total 2 + 2 + 3 + 2 + 2 = 11 records
38-
*/
39-
ZFrame<D, R, C> blakeCluster = linkOutput.filter(linkOutput.equalTo("fname", "blake"));
40-
ZFrame<D, R, C> thomasCluster = linkOutput.filter(linkOutput.equalTo("fname", "thomas"));
41-
ZFrame<D, R, C> jacksonCluster = linkOutput.filter(linkOutput.equalTo("fname", "jackson"));
42-
ZFrame<D, R, C> gianniCluster = linkOutput.filter(linkOutput.equalTo("fname", "gianni"));
43-
ZFrame<D, R, C> takeishaCluster = linkOutput.filter(linkOutput.equalTo("fname", "takeisha"));
44-
45-
Assertions.assertEquals(2, blakeCluster.count());
46-
Assertions.assertEquals(2, thomasCluster.count());
47-
Assertions.assertEquals(3, jacksonCluster.count());
48-
Assertions.assertEquals(4, gianniCluster.count());
49-
Assertions.assertEquals(0, takeishaCluster.count());
50-
25+
ZFrame<D, R, C> df1 = getOutputData().withColumn("z_zsource", "test1");
26+
df1 = df1.select("fname", "id", getClusterColName());
27+
df1 = df1.withColumn("dupeRecIdFuzzyMatch",df1.substr(df1.col("id"),0,PREFIX_MATCH_LENGTH)).cache();
28+
29+
ZFrame<D, R, C> df2 = getOutputData().withColumn("z_zsource", "test2");
30+
df2 = df2.select("fname", "id", getClusterColName());
31+
df2 = df2.withColumn("dupeRecIdFuzzyMatch",df2.substr(df2.col("id"),0,PREFIX_MATCH_LENGTH)).cache();
32+
33+
ZFrame<D, R, C> gold = joinAndFilter("dupeRecIdFuzzyMatch", df1, df2).cache();
34+
ZFrame<D, R, C> result = joinAndFilter(getClusterColName(), df1, df2).cache();
35+
36+
testAccuracy(gold, result);
5137
}
5238

39+
@Override
40+
protected ZFrame<D, R, C> joinAndFilter(String colName, ZFrame<D, R, C> df, ZFrame<D, R, C> df1){
41+
C col1 = df.col(colName);
42+
C col2 = df1.col(colName);
43+
ZFrame<D, R, C> joined = df.joinOnCol(df1, df.equalTo(col1, col2));
44+
return joined;
45+
}
46+
5347
}

config/zingg.conf

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ spark.default.parallelism=8
1515
spark.debug.maxToStringFields=200
1616
spark.driver.memory=8g
1717
spark.executor.memory=8g
18-
spark.sql.adaptive.enabled=false
1918
#spark.jars=/home/zingg/pathto.jar
2019
# Additional Jars could be passed to spark through below configuration. Jars list should be comma(,) separated.
2120
#spark.jars=
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
2-
"date": "2025-06-04",
3-
"time": "00:38:22",
2+
"date": "2025-05-28",
3+
"time": "00:38:05",
44
"test": "febrl_120K",
55
"results": {
6-
"train": 1.39,
7-
"match": 5.3
6+
"train": 1.57,
7+
"match": 5.86
88
}
99
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
2-
"date": "2025-06-04",
3-
"time": "03:30:21",
2+
"date": "2025-05-28",
3+
"time": "03:27:29",
44
"test": "ncVoters_5M",
55
"results": {
6-
"train": 1.86,
7-
"match": 56.42
6+
"train": 2.07,
7+
"match": 47.12
88
}
99
}

spark/core/src/main/java/zingg/spark/core/util/SparkGraphUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public ZFrame<Dataset<Row>, Row, Column> buildGraph(ZFrame<Dataset<Row>, Row, Co
3737
GraphFrame gf = new GraphFrame(v, e);
3838
//gf = gf.dropIsolatedVertices();
3939
//Dataset<Row> returnGraph = gf.connectedComponents().setAlgorithm("graphx").run().cache();
40-
4140
Dataset<Row> returnGraph = gf.connectedComponents().run().cache();
4241
//reverse back o avoid graphframes id :-()
4342
returnGraph = returnGraph.join(vertices, returnGraph.col("id").equalTo(vertices.col(ColName.ID_COL)));

spark/core/src/test/resources/zingg/spark/core/executor/linkerCandidate.csv

Lines changed: 0 additions & 6 deletions
This file was deleted.

spark/core/src/test/resources/zingg/spark/core/executor/linkerSource.csv

Lines changed: 0 additions & 6 deletions
This file was deleted.

spark/core/src/test/resources/zingg/spark/core/executor/single/configSparkLinkTest.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
"name":"output",
6666
"format":"csv",
6767
"props": {
68-
"location": "/tmp/junit_integration_spark/single/zinggLinkOutput",
68+
"location": "/tmp/junit_integration_spark/single/zinggOutput",
6969
"delimiter": ",",
7070
"header":true
7171
}
@@ -74,7 +74,7 @@
7474
"name":"test1",
7575
"format":"csv",
7676
"props": {
77-
"location": "./zingg/spark/core/executor/linkerCandidate.csv",
77+
"location": "./zingg/spark/core/executor/test1.csv",
7878
"delimiter": ",",
7979
"header":false
8080
},
@@ -84,7 +84,7 @@
8484
"name":"test2",
8585
"format":"csv",
8686
"props": {
87-
"location": "./zingg/spark/core/executor/linkerSource.csv",
87+
"location": "./zingg/spark/core/executor/test2.csv",
8888
"delimiter": ",",
8989
"header":false
9090
},

0 commit comments

Comments
 (0)