Skip to content

Commit 70a276a

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents 5be0294 + cee72fc commit 70a276a

File tree

13 files changed

+148
-119
lines changed

13 files changed

+148
-119
lines changed

src/main/java/org/apache/sysds/common/Types.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,10 @@ public enum ReOrgOp {
751751
DIAG, //DIAG_V2M and DIAG_M2V could not be distinguished if sizes unknown
752752
RESHAPE, REV, ROLL, SORT, TRANS;
753753

754+
public boolean preservesValues() {
755+
return this != DIAG && this != SORT;
756+
}
757+
754758
@Override
755759
public String toString() {
756760
switch(this) {

src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -980,23 +980,21 @@ private static Hop simplifyBushyBinaryOperation( Hop parent, Hop hi, int pos )
980980

981981
private static Hop simplifyUnaryAggReorgOperation( Hop parent, Hop hi, int pos )
982982
{
983-
if( hi instanceof AggUnaryOp && ((AggUnaryOp)hi).getDirection()==Direction.RowCol //full uagg
984-
&& hi.getInput().get(0) instanceof ReorgOp ) //reorg operation
983+
if( hi instanceof AggUnaryOp && ((AggUnaryOp)hi).getDirection()==Direction.RowCol
984+
&& ((AggUnaryOp)hi).getOp() != AggOp.TRACE //full uagg
985+
&& hi.getInput().get(0) instanceof ReorgOp ) //reorg operation
985986
{
986987
ReorgOp rop = (ReorgOp)hi.getInput().get(0);
987-
if( (rop.getOp()==ReOrgOp.TRANS || rop.getOp()==ReOrgOp.RESHAPE
988-
|| rop.getOp() == ReOrgOp.REV ) //valid reorg
989-
&& rop.getParent().size()==1 ) //uagg only reorg consumer
988+
if( rop.getOp().preservesValues() //valid reorg
989+
&& rop.getParent().size()==1 ) //uagg only reorg consumer
990990
{
991991
Hop input = rop.getInput().get(0);
992992
HopRewriteUtils.removeAllChildReferences(hi);
993993
HopRewriteUtils.removeAllChildReferences(rop);
994994
HopRewriteUtils.addChildReference(hi, input);
995-
996995
LOG.debug("Applied simplifyUnaryAggReorgOperation");
997996
}
998997
}
999-
1000998
return hi;
1001999
}
10021000

src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,8 @@ public FederatedRequest[] broadcastSliced(CacheableData<?> data, boolean transpo
147147
return broadcastSliced(data, null, transposed);
148148
}
149149

150-
public FederatedRequest[] broadcastSliced(MatrixLineagePair moLin,
151-
boolean transposed) {
152-
return broadcastSliced(moLin.getMO(), moLin.getLI(),
153-
transposed);
150+
public FederatedRequest[] broadcastSliced(MatrixLineagePair moLin, boolean transposed) {
151+
return broadcastSliced(moLin.getMO(), moLin.getLI(), transposed);
154152
}
155153

156154
/**

src/main/java/org/apache/sysds/runtime/instructions/fed/CovarianceFEDInstruction.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -114,27 +114,20 @@ private void processAlignedFedCov(ExecutionContext ec, MatrixObject mo1, MatrixO
114114
new CPOperand[]{input1, input2, input3}, new long[]{mo1.getFedMapping().getID(),
115115
mo2.getFedMapping().getID(), moLin3.getFedMapping().getID()});
116116
}
117-
117+
118118
FederatedRequest fr2 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
119119
FederatedRequest fr3 = mo1.getFedMapping().cleanup(getTID(), fr1.getID());
120-
Future<FederatedResponse>[] covTmp = mo1.getFedMapping().execute(getTID(), true, fr1, fr2, fr3);
121-
122-
//means
123-
Future<FederatedResponse>[] meanTmp1 = processMean(mo1, moLin3, 0);
124-
Future<FederatedResponse>[] meanTmp2 = processMean(mo2, moLin3, 1);
125-
126-
Double[] cov = getResponses(covTmp);
127-
Double[] mean1 = getResponses(meanTmp1);
128-
Double[] mean2 = getResponses(meanTmp2);
120+
Double[] cov = getResponses(mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3));
121+
Double[] mean1 = getResponses(processMean(mo1, moLin3, 0));
122+
Double[] mean2 = getResponses(processMean(mo2, moLin3, 1));
129123

130124
if (moLin3 == null) {
131125
double result = aggCov(cov, mean1, mean2, mo1.getFedMapping().getFederatedRanges());
132126
ec.setVariable(output.getName(), new DoubleObject(result));
133127
}
134128
else {
135-
Future<FederatedResponse>[] weightsSumTmp = getWeightsSum(moLin3, moLin3.getFedMapping().getID(), instString, moLin3.getFedMapping());
136-
Double[] weights = getResponses(weightsSumTmp);
137-
129+
Double[] weights = getResponses(
130+
getWeightsSum(moLin3, moLin3.getFedMapping().getID(), instString, moLin3.getFedMapping()));
138131
double result = aggWeightedCov(cov, mean1, mean2, weights);
139132
ec.setVariable(output.getName(), new DoubleObject(result));
140133
}
@@ -154,21 +147,13 @@ private void processFedCovWeights(ExecutionContext ec, MatrixObject mo1, MatrixO
154147
new CPOperand[]{input1, input2, input3},
155148
new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID(), fr1[0].getID()}
156149
);
150+
//sequential execution of cov and means for robustness
157151
FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
158152
FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
159-
Future<FederatedResponse>[] covTmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
160-
161-
//means
162-
Future<FederatedResponse>[] meanTmp1 = processMean(mo1, 0, fr1[0].getID());
163-
Future<FederatedResponse>[] meanTmp2 = processMean(mo2, 1, fr1[0].getID());
164-
165-
Double[] cov = getResponses(covTmp);
166-
Double[] mean1 = getResponses(meanTmp1);
167-
Double[] mean2 = getResponses(meanTmp2);
168-
169-
Future<FederatedResponse>[] weightsSumTmp = getWeightsSum(moLin3, fr1[0].getID(), instString, mo1.getFedMapping());
170-
Double[] weights = getResponses(weightsSumTmp);
171-
153+
Double[] cov = getResponses(mo1.getFedMapping().execute(getTID(), true, fr1, fr2, fr3, fr4));
154+
Double[] mean1 = getResponses(processMean(mo1, 0, fr1[0].getID()));
155+
Double[] mean2 = getResponses(processMean(mo2, 1, fr1[0].getID()));
156+
Double[] weights = getResponses(getWeightsSum(moLin3, fr1[0].getID(), instString, mo1.getFedMapping()));
172157
double result = aggWeightedCov(cov, mean1, mean2, weights);
173158
ec.setVariable(output.getName(), new DoubleObject(result));
174159
}
@@ -243,7 +228,7 @@ private static Double[] getResponses(Future<FederatedResponse>[] ffr) {
243228
fr[i] = ((ScalarObject) ffr[i].get().getData()[0]).getDoubleValue();
244229
}
245230
catch(Exception e) {
246-
throw new DMLRuntimeException("CovarianceFEDInstruction: incorrect means or cov.");
231+
throw new DMLRuntimeException("CovarianceFEDInstruction: incorrect means or cov.", e);
247232
}
248233
});
249234

@@ -302,7 +287,7 @@ private static double aggWeightedCov(Double[] covValues, Double[] mean1, Double[
302287
}
303288

304289
private Future<FederatedResponse>[] processMean(MatrixObject mo1, MatrixLineagePair moLin3, int var){
305-
String[] parts = instString.split("°");
290+
String[] parts = instString.split(Lop.OPERAND_DELIMITOR);
306291
Future<FederatedResponse>[] meanTmp = null;
307292
if (moLin3 == null) {
308293
String meanInstr = instString.replace(getOpcode(), getOpcode().replace("cov", "uamean"));

src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import org.apache.sysds.conf.ConfigurationManager;
3333
import org.apache.sysds.runtime.DMLRuntimeException;
3434
import org.apache.sysds.runtime.data.DenseBlock;
35+
import org.apache.sysds.runtime.data.SparseBlock;
3536
import org.apache.sysds.runtime.io.hdf5.H5;
3637
import org.apache.sysds.runtime.io.hdf5.H5Constants;
3738
import org.apache.sysds.runtime.io.hdf5.H5ContiguousDataset;
3839
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
3940
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
41+
import org.apache.sysds.runtime.util.UtilFunctions;
4042

4143
public class ReaderHDF5 extends MatrixReader {
4244
protected final FileFormatPropertiesHDF5 _props;
@@ -51,7 +53,7 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
5153
//allocate output matrix block
5254
MatrixBlock ret = null;
5355
if(rlen >= 0 && clen >= 0) //otherwise allocated on read
54-
ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, false);
56+
ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, true);
5557

5658
//prepare file access
5759
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -92,7 +94,8 @@ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long cle
9294

9395
private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
9496
FileSystem fs, MatrixBlock dest, long rlen, long clen, int blen, String datasetName)
95-
throws IOException, DMLRuntimeException {
97+
throws IOException, DMLRuntimeException
98+
{
9699
//prepare file paths in alphanumeric order
97100
ArrayList<Path> files = new ArrayList<>();
98101
if(fs.getFileStatus(path).isDirectory()) {
@@ -105,7 +108,7 @@ private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
105108

106109
//determine matrix size via additional pass if required
107110
if(dest == null) {
108-
dest = computeHDF5Size(files, fs, datasetName);
111+
dest = computeHDF5Size(files, fs, datasetName, rlen*clen);
109112
clen = dest.getNumColumns();
110113
rlen = dest.getNumRows();
111114
}
@@ -124,7 +127,8 @@ private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
124127
}
125128

126129
public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetName, MatrixBlock dest,
127-
int row, long rlen, long clen, int blen) {
130+
int rl, long ru, long clen, int blen)
131+
{
128132
bis.mark(0);
129133
long lnnz = 0;
130134
H5RootObject rootObject = H5.H5Fopen(bis);
@@ -133,28 +137,44 @@ public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetNam
133137
int[] dims = rootObject.getDimensions();
134138
int ncol = dims[1];
135139

136-
DenseBlock denseBlock = dest.getDenseBlock();
137-
double[] data = new double[ncol];
138-
for(int i = row; i < rlen; i++) {
139-
H5.H5Dread(contiguousDataset, i, data);
140-
for(int j = 0; j < ncol; j++) {
141-
if(data[j] != 0) {
142-
denseBlock.set(i, j, data[j]);
143-
lnnz++;
140+
try {
141+
double[] row = new double[ncol];
142+
if( dest.isInSparseFormat() ) {
143+
SparseBlock sb = dest.getSparseBlock();
144+
for(int i = rl; i < ru; i++) {
145+
H5.H5Dread(contiguousDataset, i, row);
146+
int lnnzi = UtilFunctions.computeNnz(row, 0, (int)clen);
147+
sb.allocate(i, lnnzi); //avoid row reallocations
148+
for(int j = 0; j < ncol; j++)
149+
sb.append(i, j, row[j]); //prunes zeros
150+
lnnz += lnnzi;
151+
}
152+
}
153+
else {
154+
DenseBlock denseBlock = dest.getDenseBlock();
155+
for(int i = rl; i < ru; i++) {
156+
H5.H5Dread(contiguousDataset, i, row);
157+
for(int j = 0; j < ncol; j++) {
158+
if(row[j] != 0) {
159+
denseBlock.set(i, j, row[j]);
160+
lnnz++;
161+
}
162+
}
144163
}
145164
}
146-
row++;
147165
}
148-
IOUtilFunctions.closeSilently(bis);
166+
finally {
167+
IOUtilFunctions.closeSilently(bis);
168+
}
149169
return lnnz;
150170
}
151171

152-
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName)
153-
throws IOException, DMLRuntimeException {
172+
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName, long estnnz)
173+
throws IOException, DMLRuntimeException
174+
{
154175
int nrow = 0;
155176
int ncol = 0;
156177
for(int fileNo = 0; fileNo < files.size(); fileNo++) {
157-
158178
BufferedInputStream bis = new BufferedInputStream(fs.open(files.get(fileNo)));
159179
H5RootObject rootObject = H5.H5Fopen(bis);
160180
H5.H5Dopen(rootObject, datasetName);
@@ -166,6 +186,6 @@ public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, Strin
166186
IOUtilFunctions.closeSilently(bis);
167187
}
168188
// allocate target matrix block based on given size;
169-
return createOutputMatrixBlock(nrow, ncol, nrow, (long) nrow * ncol, true, false);
189+
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz, true, true);
170190
}
171191
}

src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.sysds.runtime.io.hdf5.H5Constants;
3939
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
4040
import org.apache.sysds.runtime.util.CommonThreadPool;
41+
import org.apache.sysds.runtime.util.HDFSTool;
4142

4243
public class ReaderHDF5Parallel extends ReaderHDF5 {
4344

@@ -46,7 +47,7 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
4647

4748
public ReaderHDF5Parallel(FileFormatPropertiesHDF5 props) {
4849
super(props);
49-
_numThreads = OptimizerUtils.getParallelTextReadParallelism();
50+
_numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
5051
}
5152

5253
@Override
@@ -69,26 +70,31 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
6970
// allocate output matrix block
7071
ArrayList<Path> files = new ArrayList<>();
7172
files.add(path);
72-
MatrixBlock src = computeHDF5Size(files, fs, _props.getDatasetName());
73-
73+
MatrixBlock src = computeHDF5Size(files, fs, _props.getDatasetName(), estnnz);
74+
int numParts = Math.min(files.size(), _numThreads);
75+
7476
//create and execute tasks
7577
ExecutorService pool = CommonThreadPool.get(_numThreads);
7678
try {
7779
int bufferSize = (src.getNumColumns() * src.getNumRows()) * 8 + H5Constants.STATIC_HEADER_SIZE;
7880
ArrayList<ReadHDF5Task> tasks = new ArrayList<>();
7981
rlen = src.getNumRows();
80-
int blklen = (int) Math.ceil((double) rlen / _numThreads);
82+
int blklen = (int) Math.ceil((double) rlen / numParts);
8183
for(int i = 0; i < _numThreads & i * blklen < rlen; i++) {
8284
int rl = i * blklen;
8385
int ru = (int) Math.min((i + 1) * blklen, rlen);
84-
BufferedInputStream bis = new BufferedInputStream(fs.open(path), bufferSize);
86+
Path newPath = HDFSTool.isDirectory(fs, path) ?
87+
new Path(path, IOUtilFunctions.getPartFileName(i)) : path;
88+
BufferedInputStream bis = new BufferedInputStream(fs.open(newPath), bufferSize);
8589

8690
//BufferedInputStream bis, String datasetName, MatrixBlock src, MutableInt rl, int ru
87-
tasks.add(new ReadHDF5Task(bis, _props.getDatasetName(), src, rl, ru));
91+
tasks.add(new ReadHDF5Task(bis, _props.getDatasetName(), src, rl, ru, clen, blklen));
8892
}
8993

90-
for(Future<Object> task : pool.invokeAll(tasks))
91-
task.get();
94+
long nnz = 0;
95+
for(Future<Long> task : pool.invokeAll(tasks))
96+
nnz += task.get();
97+
src.setNonZeros(nnz);
9298

9399
return src;
94100
}
@@ -102,31 +108,36 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
102108

103109
@Override
104110
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz)
105-
throws IOException, DMLRuntimeException {
106-
111+
throws IOException, DMLRuntimeException
112+
{
107113
return new ReaderHDF5(_props).readMatrixFromInputStream(is, rlen, clen, blen, estnnz);
108114
}
109115

110-
private static class ReadHDF5Task implements Callable<Object> {
116+
private static class ReadHDF5Task implements Callable<Long> {
111117

112118
private final BufferedInputStream _bis;
113119
private final String _datasetName;
114120
private final MatrixBlock _src;
115121
private final int _rl;
116122
private final int _ru;
123+
private final long _clen;
124+
private final int _blen;
117125

118-
public ReadHDF5Task(BufferedInputStream bis, String datasetName, MatrixBlock src, int rl, int ru) {
126+
public ReadHDF5Task(BufferedInputStream bis, String datasetName, MatrixBlock src,
127+
int rl, int ru, long clen, int blen)
128+
{
119129
_bis = bis;
120130
_datasetName = datasetName;
121131
_src = src;
122132
_rl = rl;
123133
_ru = ru;
134+
_clen = clen;
135+
_blen = blen;
124136
}
125137

126138
@Override
127-
public Object call() throws IOException {
128-
readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru, 0, 0);
129-
return null;
139+
public Long call() throws IOException {
140+
return readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru, _clen, _blen);
130141
}
131142
}
132143
}

0 commit comments

Comments
 (0)