Skip to content

Commit 21cf96b

Browse files
Implementation of Chapter 24 codes in Java
I will keep adding codes for all the chapters subsequently
1 parent dcd0242 commit 21cf96b

File tree

1 file changed

+142
-0
lines changed

1 file changed

+142
-0
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import java.io.IOException;
2+
import java.util.ArrayList;
3+
import java.util.List;
4+
5+
import org.apache.spark.ml.Pipeline;
6+
import org.apache.spark.ml.PipelineModel;
7+
import org.apache.spark.ml.PipelineStage;
8+
import org.apache.spark.ml.classification.LogisticRegression;
9+
import org.apache.spark.ml.classification.LogisticRegressionModel;
10+
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
11+
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
12+
import org.apache.spark.ml.feature.RFormula;
13+
import org.apache.spark.ml.feature.RFormulaModel;
14+
import org.apache.spark.ml.linalg.Vector;
15+
import org.apache.spark.ml.linalg.Vectors;
16+
import org.apache.spark.ml.param.ParamMap;
17+
import org.apache.spark.ml.tuning.ParamGridBuilder;
18+
import org.apache.spark.ml.tuning.TrainValidationSplit;
19+
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
20+
import org.apache.spark.sql.Dataset;
21+
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.SparkSession;
23+
24+
public class Advanced_Analytics_and_Machine_Learning_Chapter_24_Advanced_Analytics_and_Machine_Learning {
25+
26+
public static void main(String[] args) throws IOException {
27+
SparkSession spark = SparkSession
28+
.builder()
29+
.master("local[*]")
30+
.appName("Chapter24AdvancedAnalyticsAndMachineLearning")
31+
.getOrCreate();
32+
33+
// Creating a Dense Vector
34+
Vector denseVector = Vectors.dense(1.0, 2.0, 3.0);
35+
System.out.println("Dense Vector:" + denseVector.toString());
36+
37+
// Creating a Sparse Vector
38+
int size = 3;
39+
int[] idx = new int[] { 1, 2 };
40+
double[] values = new double[] { 2.0, 3.0 };
41+
42+
Vector sparseVector = Vectors.sparse(size, idx, values);
43+
System.out.println("Sparse Vector:" + sparseVector.toString());
44+
45+
// Reading the simple json file into a Dataframe / Dataset
46+
Dataset<Row> df = spark.read().json("data/simple-ml");
47+
df.orderBy("value2").show();
48+
49+
// Initializing RFormula
50+
RFormula supervised = new RFormula().setFormula("lab ~ . + color:value1 + color:value2");
51+
52+
// Applying RFormula
53+
RFormulaModel fittedRF = supervised.fit(df);
54+
Dataset<Row> preparedDF = fittedRF.transform(df);
55+
preparedDF.show();
56+
57+
// Splitting PreparedDF into Train DF and Test DF
58+
Dataset<Row>[] trainTestDataArray = preparedDF.randomSplit(new double[] { 0.7, 0.3 });
59+
Dataset<Row> train = trainTestDataArray[0];
60+
Dataset<Row> test = trainTestDataArray[1];
61+
62+
// Displaying sample data from both train as well as test dataset
63+
train.show();
64+
test.show();
65+
66+
// Applying Logistic Regression on the split data
67+
LogisticRegression logisticRegression = new LogisticRegression().setLabelCol("label")
68+
.setFeaturesCol("features");
69+
70+
// Printing the params
71+
System.out.println(logisticRegression.explainParams());
72+
73+
// Fitting Train dataset into the model
74+
LogisticRegressionModel fittedLR = logisticRegression.fit(train);
75+
fittedLR.transform(train).select("label","prediction").show();
76+
77+
// Splitting the original dataset into train and test set
78+
Dataset<Row> [] trainTestSet = df.randomSplit(new double [] {0.7, 0.3});
79+
train = trainTestSet[0];
80+
test = trainTestSet[1];
81+
82+
// Initializing RFormula
83+
RFormula rForm = new RFormula();
84+
85+
// Reinitializing the Logistic Regression model since it is already trained on other data
86+
logisticRegression = new LogisticRegression().setLabelCol("label").setFeaturesCol("features");
87+
88+
// Creating a Pipeline for execution
89+
PipelineStage[] stages = new PipelineStage[2];
90+
stages[0] = rForm;
91+
stages[1] = logisticRegression;
92+
93+
Pipeline pipeline = new Pipeline().setStages(stages);
94+
95+
// Applying Param Grid Builder
96+
List<String> paramValues = new ArrayList<String>();
97+
paramValues.add("lab ~ . + color:value1");
98+
paramValues.add("lab ~ . + color:value1 + color:value2");
99+
100+
ParamMap [] params = new ParamGridBuilder()
101+
.addGrid(rForm.formula(), scala.collection.JavaConverters
102+
.asScalaIteratorConverter(paramValues.iterator()).asScala().toSeq())
103+
.addGrid(logisticRegression.elasticNetParam(), new double [] {0.0, 0.5, 1.0})
104+
.addGrid(logisticRegression.regParam(), new double [] {0.1, 2.0})
105+
.build();
106+
107+
// Obtaining the Evaluator
108+
BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator()
109+
.setMetricName("areaUnderROC")
110+
.setRawPredictionCol("prediction")
111+
.setLabelCol("label");
112+
113+
// Creating Train Validation Split
114+
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
115+
.setTrainRatio(0.75)
116+
.setEstimatorParamMaps(params)
117+
.setEstimator(pipeline)
118+
.setEvaluator(evaluator);
119+
120+
TrainValidationSplitModel tvsFitted = trainValidationSplit.fit(train);
121+
122+
evaluator.evaluate(tvsFitted.transform(test));
123+
124+
// Getting the best Model
125+
PipelineModel trainedPipeline = (PipelineModel) tvsFitted.bestModel();
126+
LogisticRegressionModel trainedLR = (LogisticRegressionModel) trainedPipeline.stages()[1];
127+
LogisticRegressionTrainingSummary summaryLR = trainedLR.summary();
128+
129+
for(double objectiveHist: summaryLR.objectiveHistory()) {
130+
System.out.println(objectiveHist);
131+
}
132+
133+
// Persisting the model to disk
134+
tvsFitted.write().overwrite().save("tmp/modelLocation");
135+
136+
// Loading the persisted model from disk and testing
137+
TrainValidationSplitModel model = TrainValidationSplitModel.load("tmp/modelLocation");
138+
Dataset<Row> tested = model.transform(test);
139+
140+
tested.show();
141+
}
142+
}

0 commit comments

Comments
 (0)