Skip to content

[SPARK-23975][ML]Allow Clustering to take Arrays of Double as input features #21081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
consolidating featuretovector
  • Loading branch information
lu-wang-dl committed Apr 19, 2018
commit 009b918c8734b19f9f9b34a31c23d6ad582c7465
85 changes: 59 additions & 26 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -91,9 +91,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
val typeCandidates = List( new VectorUDT,
new ArrayType(DoubleType, true),
new ArrayType(DoubleType, false),
new ArrayType(FloatType, true),
new ArrayType(FloatType, false))
SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
Expand Down Expand Up @@ -125,25 +123,32 @@ class KMeansModel private[ml] (
@Since("2.0.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

@Since("2.4.0")
def featureToVector(dataset: Dataset[_], col: Column): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private. In general, we try to keep APIs as private as possible since that allows us more flexibility to make changes in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, add a Scala docstring saying what this does.

val featuresDataType = dataset.schema(getFeaturesCol).dataType
val transferUDF = featuresDataType match {
case _: VectorUDT => udf((vector: Vector) => vector)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return col(getFeaturesCol) since that will be more efficient. (Calling a UDF requires data serialization overhead.)

case fdt: ArrayType => fdt.elementType match {
case _: FloatType => udf(f = (vector: Seq[Float]) => {
val featureArray = Array.fill[Double](vector.size)(0.0)
vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble)
Vectors.dense(featureArray)
})
case _: DoubleType => udf((vector: Seq[Double]) => {
Vectors.dense(vector.toArray)
})
}
}
transferUDF(col)
}

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
// val predictUDF = udf((vector: Vector) => predict(vector))
val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) {
udf((vector: Vector) => predict(vector))
}
else {
udf((vector: Seq[_]) => {
val featureArray = Array.fill[Double](vector.size)(0.0)
for (idx <- 0 until vector.size) {
featureArray(idx) = vector(idx).toString().toDouble
}
OldVectors.fromML(Vectors.dense(featureArray))
predict(Vectors.dense(featureArray))
})
}

dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
val predictUDF = udf((vector: Vector) => predict(vector))

dataset.withColumn($(predictionCol), predictUDF(featureToVector(dataset, col(getFeaturesCol))))
}

@Since("1.5.0")
Expand All @@ -164,20 +169,24 @@ class KMeansModel private[ml] (
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
val typeCandidates = List( new VectorUDT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse validateAndTransformSchema here.

new ArrayType(DoubleType, true),
new ArrayType(DoubleType, false),
new ArrayType(FloatType, true),
new ArrayType(FloatType, false))
SchemaUtils.checkColumnTypes(dataset.schema, $(featuresCol), typeCandidates)
val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {

/* val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
case Row(point: Seq[_]) =>
val featureArray = Array.fill[Double](point.size)(0.0)
for (idx <- 0 until point.size) {
featureArray(idx) = point(idx).toString().toDouble
for (idx <- point.indices) {
featureArray(idx) = point(idx).toString.toDouble
}
OldVectors.fromML(Vectors.dense(featureArray))
}
*/
val data: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol)))
.rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
parentModel.computeCost(data)
}

Expand Down Expand Up @@ -335,21 +344,45 @@ class KMeans @Since("1.5.0") (
@Since("1.5.0")
def setSeed(value: Long): this.type = set(seed, value)

@Since("2.4.0")
def featureToVector(dataset: Dataset[_], col: Column): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a copy of the same method? It should be shared, either in KMeansParams or in a static (object) method.

val featuresDataType = dataset.schema(getFeaturesCol).dataType
val transferUDF = featuresDataType match {
case _: VectorUDT => udf((vector: Vector) => vector)
case fdt: ArrayType => fdt.elementType match {
case _: FloatType => udf(f = (vector: Seq[Float]) => {
val featureArray = Array.fill[Double](vector.size)(0.0)
vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble)
Vectors.dense(featureArray)
})
case _: DoubleType => udf((vector: Seq[Double]) => {
Vectors.dense(vector.toArray)
})
}
}
transferUDF(col)
}

@Since("2.0.0")
override def fit(dataset: Dataset[_]): KMeansModel = {
transformSchema(dataset.schema, logging = true)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val instances: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol)))
.rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
/*
val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
case Row(point: Seq[_]) =>
val featureArray = Array.fill[Double](point.size)(0.0)
for (idx <- 0 until point.size) {
featureArray(idx) = point(idx).toString().toDouble
for (idx <- point.indices) {
featureArray(idx) = point(idx).toString.toDouble
}
OldVectors.fromML(Vectors.dense(featureArray))
}

*/
if (handlePersistence) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
Expand Down