Skip to content

Commit 94fb9f4

Browse files
author
Ji ZHANG
committed
output
1 parent 8bcfc61 commit 94fb9f4

File tree

2 files changed

+40
-5
lines changed

2 files changed

+40
-5
lines changed

src/main/scala/recommendation/MainClass.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package recommendation
22

33
import scala.util.Random
44
import org.slf4j.LoggerFactory
5+
import org.apache.hadoop.fs.{FileSystem, Path}
56
import org.apache.spark._
67
import org.apache.spark.storage.StorageLevel
78
import org.apache.spark.rdd.RDD
@@ -15,7 +16,8 @@ case class Config(
1516
algorithm: String = "",
1617
similarityMethod: String = "",
1718
recommendMethod: String = "",
18-
numRecommendations: Int = 30)
19+
numRecommendations: Int = 30,
20+
outputPath: String = "")
1921

2022
object MainClass {
2123

@@ -34,8 +36,10 @@ object MainClass {
3436
opt[String]('r', "recommend-method") action { (x, c) =>
3537
c.copy(recommendMethod = x) } text("recommend method")
3638
opt[Int]('n', "num-recommendations") action { (x, c) =>
37-
c.copy(numRecommendations = x) } text ("number of recommendations")
38-
help("help") text ("prints this usage text")
39+
c.copy(numRecommendations = x) } text("number of recommendations")
40+
opt[String]('o', "output-path") action { (x, c) =>
41+
c.copy(outputPath = x) } text("output path")
42+
help("help") text("prints this usage text")
3943
}
4044

4145
val config = parser.parse(args, Config()) match {
@@ -46,6 +50,15 @@ object MainClass {
4650
val conf = new SparkConf().setAppName("Recommendation").setIfMissing("spark.master", "local[4]")
4751
val sc = new SparkContext(conf)
4852

53+
if (config.outputPath.nonEmpty) {
54+
logger.info("Delete output path " + config.outputPath)
55+
try {
56+
FileSystem.get(sc.hadoopConfiguration).delete(new Path(config.outputPath), true)
57+
} catch {
58+
case _: Exception =>
59+
}
60+
}
61+
4962
// read logs - logs are pre-grouped by user-item
5063
val logs = sc.textFile(config.inputPath).flatMap { line =>
5164
try {
@@ -79,7 +92,8 @@ object MainClass {
7992

8093
val commonParams = Map(
8194
"numNeighbours" -> 50,
82-
"numRecommendations" -> config.numRecommendations
95+
"numRecommendations" -> config.numRecommendations,
96+
"outputPath" -> config.outputPath
8397
)
8498

8599
// model
@@ -102,6 +116,16 @@ object MainClass {
102116

103117
userRecomm.cache()
104118

119+
if (config.outputPath.nonEmpty) {
120+
val userRecommPath = new Path(config.outputPath, "user_recommendations")
121+
logger.info("Writing user recommendations into " + userRecommPath.toString)
122+
userRecomm.flatMap { case (user, products) =>
123+
products.map { case Rating(user, product, rating) =>
124+
Seq[Any](user, product, rating).mkString("\t")
125+
}
126+
}.saveAsTextFile(userRecommPath.toString)
127+
}
128+
105129
val (precision, recall, f1) = evaluatePrecision(testingSet, userRecomm)
106130
val coverage = evaluateCoverage(trainingSet, userRecomm)
107131
val popularity = evaluatePopularity(trainingSet, userRecomm)

src/main/scala/recommendation/SimilarityRecommender.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package recommendation
22

33
import scala.collection.mutable
44
import org.slf4j.LoggerFactory
5+
import org.apache.hadoop.fs.Path
56
import org.apache.spark.rdd.RDD
67
import org.apache.spark.mllib.linalg.Vectors
78
import org.apache.spark.mllib.linalg.distributed._
@@ -18,6 +19,7 @@ object SimilarityRecommender extends Recommender {
1819
val numRecommendations = params.getInt("numRecommendations")
1920
val similarityMethod = params.getString("similarityMethod")
2021
val recommendMethod = params.getString("recommendMethod")
22+
val outputPath = params.getString("outputPath")
2123

2224
// train
2325
val entries = trainingSet.map { case Rating(user, product, rating) =>
@@ -44,14 +46,23 @@ object SimilarityRecommender extends Recommender {
4446
}.groupByKey.mapValues { products =>
4547
val productsTop = products.toSeq.sortWith(_._2 > _._2).take(numNeighbours)
4648
normalizeRange(productsTop)
49+
}.cache()
50+
51+
if (outputPath.nonEmpty) {
52+
val simTopPath = new Path(outputPath, "item_similarities")
53+
logger.info("Writing item similarities into " + simTopPath.toString)
54+
simTop.flatMap { case (i, products) =>
55+
products.map { case (j, u) =>
56+
Seq[Any](i, j, u).mkString("\t")
57+
}
58+
}.saveAsTextFile(simTopPath.toString)
4759
}
4860

4961
recommendMethod match {
5062
case "score" => recommendByScore(simTop, trainingSet, numRecommendations)
5163
case "weighted-sum" => recommendByWeightedSum(simTop, trainingSet, numRecommendations)
5264
case _ => throw new IllegalArgumentException("unknown recommend method")
5365
}
54-
5566
}
5667

5768
def recommendByWeightedSum(simTop: RDD[(Int, Iterable[(Int, Double)])], trainingSet: RDD[Rating],

0 commit comments

Comments
 (0)