Skip to content

Commit a0194df

Browse files
committed
Baked clustered distribution example and cleaned up all the examples and added logging to them
1 parent 58ba12e commit a0194df

17 files changed

+221
-91
lines changed

src/main/java/datasources/FlexibleRowDataSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public DataSourceReader createReader(DataSourceOptions options) {
4646
* resulting Dataset will have only a single partition -- that's why this DataSource
4747
* only provides sequential reads.
4848
*/
49-
class Reader implements DataSourceReader {
49+
static class Reader implements DataSourceReader {
50+
51+
static Logger log = Logger.getLogger(Reader.class.getName());
5052

5153
public Reader(String host, int port, String table) {
5254
_host = host;
@@ -77,6 +79,7 @@ public StructType readSchema() {
7779

7880
@Override
7981
public List<DataReaderFactory<Row>> createDataReaderFactories() {
82+
log.info("creating a single factory");
8083
return java.util.Collections.singletonList(
8184
new SimpleDataReaderFactory(_host, _port, _table, readSchema()));
8285
}
@@ -127,6 +130,8 @@ public void close() throws IOException {
127130
*/
128131
static class SimpleDataReaderFactory implements DataReaderFactory<Row> {
129132

133+
static Logger log = Logger.getLogger(SimpleDataReaderFactory.class.getName());
134+
130135
public SimpleDataReaderFactory(String host, int port,
131136
String table, StructType schema) {
132137
_host = host;

src/main/java/datasources/ParallelRowDataSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public DataSourceReader createReader(DataSourceOptions options) {
4848
* and how it obtains the reader factories to be used by the executors to create readers.
4949
* Notice that one factory is created for each partition.
5050
*/
51-
class Reader implements DataSourceReader {
51+
static class Reader implements DataSourceReader {
52+
53+
static Logger log = Logger.getLogger(Reader.class.getName());
5254

5355
public Reader(String host, int port, String table, int partitions) {
5456
_host = host;
@@ -100,6 +102,7 @@ public List<DataReaderFactory<Row>> createDataReaderFactories() {
100102
new SplitDataReaderFactory(_host, _port, _table, readSchema(), split);
101103
factories.add(factory);
102104
}
105+
log.info("created " + factories.size() + " factories");
103106
return factories;
104107
}
105108
}
@@ -149,6 +152,8 @@ public void close() throws IOException {
149152
*/
150153
static class SplitDataReaderFactory implements DataReaderFactory<Row> {
151154

155+
static Logger log = Logger.getLogger(SplitDataReaderFactory.class.getName());
156+
152157
public SplitDataReaderFactory(String host, int port,
153158
String table, StructType schema,
154159
Split split) {

src/main/java/datasources/PartitioningRowDataSource.java

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -53,54 +53,67 @@ public DataSourceReader createReader(DataSourceOptions options) {
5353
* and how it obtains the reader factories to be used by the executors to create readers.
5454
* Notice that one factory is created for each partition.
5555
*/
56-
class Reader implements DataSourceReader, SupportsReportPartitioning {
56+
static class Reader implements DataSourceReader, SupportsReportPartitioning {
57+
58+
static Logger log = Logger.getLogger(Reader.class.getName());
5759

5860
public Reader(String host, int port, String table, int partitions) {
5961
_host = host;
6062
_port = port;
6163
_table = table;
62-
_partitions = partitions;
64+
_requestedPartitions = partitions;
6365
}
6466

65-
private StructType _schema;
6667
private String _host;
6768
private int _port;
6869
private String _table;
69-
private int _partitions;
70+
private int _requestedPartitions;
7071

71-
@Override
72-
public StructType readSchema() {
73-
if (_schema == null) {
72+
//
73+
// dynamic properties inferred from database
74+
//
75+
76+
private boolean _initialized = false;
77+
private StructType _schema;
78+
private String _clusteredColumn;
79+
private List<Split> _splits;
80+
81+
82+
private void initialize() {
83+
if (!_initialized) {
84+
log.info("initializing");
7485
DBClientWrapper db = new DBClientWrapper(_host, _port);
7586
db.connect();
7687
try {
7788
_schema = db.getSchema(_table);
89+
_clusteredColumn = db.getClusteredIndexColumn(_table);
90+
if (_requestedPartitions == 0)
91+
_splits = db.getSplits(_table);
92+
else
93+
_splits = db.getSplits(_table, _requestedPartitions);
7894
} catch (UnknownTableException ute) {
7995
throw new RuntimeException(ute);
8096
} finally {
8197
db.disconnect();
8298
}
99+
_initialized = true;
100+
log.info("initialized");
83101
}
102+
}
103+
104+
@Override
105+
public StructType readSchema() {
106+
log.info("schema requested for table [" + _table + "]");
107+
initialize();
84108
return _schema;
85109
}
86110

87111
@Override
88112
public List<DataReaderFactory<Row>> createDataReaderFactories() {
89-
List<Split> splits = null;
90-
DBClientWrapper db = new DBClientWrapper(_host, _port);
91-
db.connect();
92-
try {
93-
if (_partitions == 0)
94-
splits = db.getSplits(_table);
95-
else
96-
splits = db.getSplits(_table, _partitions);
97-
} catch (UnknownTableException ute) {
98-
throw new RuntimeException(ute);
99-
} finally {
100-
db.disconnect();
101-
}
113+
log.info("reader factories requested for table [" + _table + "]");
114+
initialize();
102115
List<DataReaderFactory<Row>> factories = new ArrayList<>();
103-
for (Split split : splits) {
116+
for (Split split : _splits) {
104117
DataReaderFactory<Row> factory =
105118
new SplitDataReaderFactory(_host, _port, _table, readSchema(), split);
106119
factories.add(factory);
@@ -110,37 +123,24 @@ public List<DataReaderFactory<Row>> createDataReaderFactories() {
110123

111124
@Override
112125
public Partitioning outputPartitioning() {
113-
return new TrivialPartitioning();
114-
}
115-
}
116-
117-
static class TrivialPartitioning implements Partitioning {
118-
119-
static Logger log = Logger.getLogger(TrivialPartitioning.class.getName());
120-
121-
@Override
122-
public int numPartitions() {
123-
log.info("asked for numPartitions");
124-
return 8;
125-
}
126-
127-
@Override
128-
public boolean satisfy(Distribution distribution) {
129-
log.info("asked to satisfy");
130-
// can't satisfy any Distribution
131-
return false;
126+
log.info("output partitioning requested for table [" + _table + "]");
127+
return new SingleClusteredColumnPartitioning(
128+
_clusteredColumn, _splits.size());
132129
}
133130
}
134131

135132
static class SingleClusteredColumnPartitioning implements Partitioning {
136133

134+
static Logger log = Logger.getLogger(SingleClusteredColumnPartitioning.class.getName());
135+
137136
public SingleClusteredColumnPartitioning(String columnName, int partitions) {
138137
_columnName = columnName;
139138
_partitions = partitions;
140139
}
141140

142141
@Override
143142
public int numPartitions() {
143+
log.info("asked for numPartitions");
144144
return _partitions;
145145
}
146146

@@ -150,11 +150,31 @@ public boolean satisfy(Distribution distribution) {
150150
// Since Spark may add other Distribution policies in the future, we can't assume
151151
// it's always a ClusteredDistribution
152152
//
153+
153154
if (distribution instanceof ClusteredDistribution) {
155+
154156
String[] clusteredCols = ((ClusteredDistribution) distribution).clusteredColumns;
155-
return Arrays.asList(clusteredCols).contains(_columnName);
157+
StringBuilder logEntryBuilder = new StringBuilder();
158+
logEntryBuilder.append("asked to satisfy ClusteredDistribution on columns ");
159+
if (clusteredCols.length > 0) {
160+
for (String col : clusteredCols) {
161+
logEntryBuilder.append("[");
162+
logEntryBuilder.append(col);
163+
logEntryBuilder.append("] ");
164+
}
165+
}
166+
log.info(logEntryBuilder.toString());
167+
if (_columnName == null) {
168+
log.info("no cluster column so does not satisfy");
169+
return false;
170+
} else {
171+
boolean satisfies = Arrays.asList(clusteredCols).contains(_columnName);
172+
log.info("based on cluster column: " + satisfies);
173+
return satisfies;
174+
}
156175
}
157-
176+
log.info("asked to satisfy unknown distribution of type [" +
177+
distribution.getClass().getCanonicalName() + "]");
158178
return false;
159179
}
160180

@@ -207,6 +227,8 @@ public void close() throws IOException {
207227
*/
208228
static class SplitDataReaderFactory implements DataReaderFactory<Row> {
209229

230+
static Logger log = Logger.getLogger(SplitDataReaderFactory.class.getName());
231+
210232
public SplitDataReaderFactory(String host, int port,
211233
String table, StructType schema,
212234
Split split) {

src/main/java/datasources/SimpleRowDataSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ public DataSourceReader createReader(DataSourceOptions options) {
4747
* resulting Dataset will have only a single partition -- that's why this DataSource
4848
* only provides sequential reads.
4949
*/
50-
class Reader implements DataSourceReader {
50+
static class Reader implements DataSourceReader {
51+
52+
static Logger log = Logger.getLogger(Reader.class.getName());
5153

5254
public Reader(String host, int port) {
5355
_host = host;
@@ -65,6 +67,7 @@ public StructType readSchema() {
6567

6668
@Override
6769
public List<DataReaderFactory<Row>> createDataReaderFactories() {
70+
log.info("creating a single factory");
6871
return java.util.Arrays.asList(new SimpleDataReaderFactory(_host, _port));
6972
}
7073
}
@@ -115,6 +118,8 @@ public void close() throws IOException {
115118
*/
116119
static class SimpleDataReaderFactory implements DataReaderFactory<Row> {
117120

121+
static Logger log = Logger.getLogger(SimpleDataReaderFactory.class.getName());
122+
118123
public SimpleDataReaderFactory(String host, int port) {
119124
_host = host;
120125
_port = port;

src/main/java/datasources/utils/DBClientWrapper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,20 @@ public StructType getSchema(String table) throws UnknownTableException
6060
fields.add(DataTypes.createStructField(name,
6161
DataTypes.DoubleType, true));
6262
break;
63+
case STRING:
64+
fields.add(DataTypes.createStructField(name,
65+
DataTypes.StringType, true));
66+
break;
6367
default:
6468
}
6569
}
6670
return DataTypes.createStructType(fields);
6771
}
6872

73+
public String getClusteredIndexColumn(String table) throws UnknownTableException {
74+
return _client.getTableClusteredIndexColumn(table);
75+
}
76+
6977
public List<Split> getSplits(String table, int count) throws UnknownTableException {
7078
return _client.getSplits(table, count);
7179
}

src/main/java/edb/client/DBClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,30 @@ public Schema getTableSchema(String name) throws UnknownTableException {
126126
}
127127
}
128128

129+
public String getTableClusteredIndexColumn(String tableName) throws UnknownTableException {
130+
GetTableClusteredIndexColumnRequest.Builder builder = GetTableClusteredIndexColumnRequest.newBuilder();
131+
builder.setTableName(tableName);
132+
133+
GetTableClusteredIndexColumnRequest request = builder.build();
134+
GetTableClusteredIndexColumnResponse response;
135+
try {
136+
response = _blockingStub.getTableClusteredIndexColumn(request);
137+
} catch (StatusRuntimeException e) {
138+
e.printStackTrace();
139+
throw e;
140+
}
141+
142+
if (response.getResult()) {
143+
if (response.hasColumnName()) {
144+
return response.getColumnName();
145+
} else {
146+
return null;
147+
}
148+
} else {
149+
throw new UnknownTableException(tableName);
150+
}
151+
}
152+
129153
public void bulkInsert(String name, List<Row> rows) throws UnknownTableException
130154
{
131155
BulkInsertRequest.Builder builder = BulkInsertRequest.newBuilder();

src/main/java/edb/common/IExampleDB.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ void createTable(String name, Schema schema, String clusterColumn)
1212

1313
Schema getTableSchema(String name) throws UnknownTableException;
1414

15+
String getTableClusteredIndexColumn(String name) throws UnknownTableException;
16+
1517
void bulkInsert(String name, List<Row> rows) throws UnknownTableException;
1618

1719
List<Row> getAllRows(String name) throws UnknownTableException;

src/main/java/edb/server/ClusteredIndexTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public ClusteredIndexTable(String name, Schema schema, String indexColumn) {
2727

2828
public Schema getSchema() { return _schema; }
2929

30+
public String getIndexColumn() { return _indexColumn; }
31+
3032
public void addRows(List<Row> rows) {
3133
for (Row row : rows) {
3234
try {

src/main/java/edb/server/DBServer.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,31 @@ public void getTableSchema(GetTableSchemaRequest req,
125125
responseObserver.onCompleted();
126126
}
127127

128+
@Override
129+
public void getTableClusteredIndexColumn(
130+
GetTableClusteredIndexColumnRequest req,
131+
StreamObserver<GetTableClusteredIndexColumnResponse> responseObserver) {
132+
133+
String tableName = req.getTableName();
134+
135+
GetTableClusteredIndexColumnResponse.Builder builder =
136+
GetTableClusteredIndexColumnResponse.newBuilder();
137+
try {
138+
String columnName = _db.getTableClusteredIndexColumn(tableName);
139+
if (columnName != null) {
140+
builder.setColumnName(columnName);
141+
}
142+
builder.setResult(true);
143+
144+
} catch (UnknownTableException ete) {
145+
builder.setResult(false);
146+
}
147+
148+
GetTableClusteredIndexColumnResponse reply = builder.build();
149+
responseObserver.onNext(reply);
150+
responseObserver.onCompleted();
151+
}
152+
128153
@Override
129154
public void bulkInsert(BulkInsertRequest req,
130155
StreamObserver<BulkInsertResponse> responseObserver) {

src/main/java/edb/server/Database.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ public Schema getTableSchema(String name) throws UnknownTableException {
4747
}
4848
}
4949

50+
public String getTableClusteredIndexColumn(String name) throws UnknownTableException {
51+
boolean present = _tables.containsKey(name);
52+
if (present) {
53+
ITable entry = _tables.get(name);
54+
if (entry instanceof ClusteredIndexTable) {
55+
return ((ClusteredIndexTable) entry).getIndexColumn();
56+
} else {
57+
return null;
58+
}
59+
} else {
60+
throw new UnknownTableException(name);
61+
}
62+
}
63+
5064
public void bulkInsert(String name, List<Row> rows) throws UnknownTableException {
5165
boolean present = _tables.containsKey(name);
5266
if (present) {

0 commit comments

Comments
 (0)