Skip to content

[SPARK-23975][ML]Allow Clustering to take Arrays of Double as input features #21081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

lu-wang-dl
Copy link
Contributor

What changes were proposed in this pull request?

  • Multiple possible input types is added in validateAndTransformSchema() and computeCost() while checking column type

  • Add if statement in transform() to support array type as featuresCol

  • Add the case statement in fit() while selecting columns from dataset

These changes will be applied to KMeans first, then to other clustering method

How was this patch tested?

unit test is added

Please review http://spark.apache.org/contributing.html before opening a pull request.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I made a first review pass


assert(kmeans.getK === 2)
assert(kmeans.getFeaturesCol === featuresColName)
assert(kmeans.getPredictionCol === "prediction")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check this or the other Params which are not relevant to this test

val arrayUDF = udf { (features: Vector) =>
features.toArray
}
val newdataset = dataset.withColumn(featuresColName, arrayUDF(col("features")) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could drop the original column as well just to make extra sure that it's not being accidentally used.

assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN)
val model = kmeans.setMaxIter(1).fit(newdataset)

MLTestingUtils.checkCopyAndUids(kmeans, model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this test here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for hasSummary and copying

val predictUDF = udf((vector: Vector) => predict(vector))
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
// val predictUDF = udf((vector: Vector) => predict(vector))
if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: This can be more succinct if written as:

val predictUDF = if (dataset.schema(...).dataType.equals(...)) { A } else { B }
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))  // so this line is only written once

@@ -312,6 +329,8 @@ class KMeans @Since("1.5.0") (
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
case Row(point: Seq[_]) =>
OldVectors.fromML(Vectors.dense(point.asInstanceOf[Seq[Double]].toArray))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work with arrays of FloatType. Make sure to test it

dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
} else {
val predictUDF = udf((vector: Seq[_]) =>
predict(Vectors.dense(vector.asInstanceOf[Seq[Double]].toArray)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not work with arrays of FloatType.

@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89410 has finished for PR 21081 at commit badb0cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89417 has finished for PR 21081 at commit 6d222a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates!

// val predictUDF = udf((vector: Vector) => predict(vector))
val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) {
udf((vector: Vector) => predict(vector))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala style: } else {

udf((vector: Vector) => predict(vector))
}
else {
udf((vector: Seq[_]) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: remove unnecessary { at end of line (IntelliJ should warn you about this)

@@ -90,7 +90,12 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
* @return output schema
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
val typeCandidates = List( new VectorUDT,
new ArrayType(DoubleType, true),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, let's actually disallow nullable columns. KMeans won't handle nulls properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, IntelliJ may warn you about passing boolean arguments as named arguments; that'd be nice to fix here.

}
else {
udf((vector: Seq[_]) => {
val featureArray = Array.fill[Double](vector.size)(0.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't have to do the conversion in this convoluted (and less efficient) way. I'd recommend doing a match-case statement on dataset.schema; I think that will be the most succinct. Then you can handle Vector, Seq of Float, and Seq of Double separately, without conversions to strings.

Same for the similar cases below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I meant:

    val predictUDF = featuresDataType match {
      case _: VectorUDT =>
        udf((vector: Vector) => predict(vector))
      case fdt: ArrayType => fdt.elementType match {
        case _: FloatType =>
          ???
        case _: DoubleType =>
          ???
      }
    }

val predictUDF = udf((vector: Vector) => predict(vector))
// val predictUDF = udf((vector: Vector) => predict(vector))
val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) {
udf((vector: Vector) => predict(vector))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: I realized that "predict" will cause the whole model to be serialized and sent to workers. But that's actually OK since we do need to send most of the model data to make predictions and since there's not a clean way to just sent the model weights. So I think my previous comment about copying "numClasses" to a local variable was not necessary. Don't bother reverting the change though.

@WeichenXu123
Copy link
Contributor

@jkbradley Will this be applied to other algos besides clustering algos ? and how to support sparse float features ?

@jkbradley
Copy link
Member

I hope we can apply it to other algs too. @ludatabricks is doing some refactoring which should make that easier, but we're not going for a completely general approach right away.

I don't think we need to worry about sparse FloatType features; users have no way to pass those in.

@WeichenXu123
Copy link
Contributor

So why not design generic vector class ? and then implement Vector[Double] and Vector[Float] via generic specification ? So it can support everything, no matter sparse and dense.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89585 has finished for PR 21081 at commit 009b918.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

@WeichenXu123 A generic vector class would be interesting, but that would be a big project, way out of scope of this PR. You could bring it up if that person on the dev list sends a SPIP about linear algebra.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments. (Also, remember to clean up the commented-out code.)

@@ -120,11 +123,32 @@ class KMeansModel private[ml] (
@Since("2.0.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

@Since("2.4.0")
def featureToVector(dataset: Dataset[_], col: Column): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private. In general, we try to keep APIs as private as possible since that allows us more flexibility to make changes in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, add a Scala docstring saying what this does.

def featureToVector(dataset: Dataset[_], col: Column): Column = {
val featuresDataType = dataset.schema(getFeaturesCol).dataType
val transferUDF = featuresDataType match {
case _: VectorUDT => udf((vector: Vector) => vector)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return col(getFeaturesCol) since that will be more efficient. (Calling a UDF requires data serialization overhead.)

@@ -305,15 +344,45 @@ class KMeans @Since("1.5.0") (
@Since("1.5.0")
def setSeed(value: Long): this.type = set(seed, value)

@Since("2.4.0")
def featureToVector(dataset: Dataset[_], col: Column): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a copy of the same method? It should be shared, either in KMeansParams or in a static (object) method.

@@ -144,8 +168,23 @@ class KMeansModel private[ml] (
// TODO: Replace the temp fix when we have proper evaluators defined for clustering.
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
val typeCandidates = List( new VectorUDT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse validateAndTransformSchema here.

add validateSchema and use it in computeCost

addressed the comments from @jkbradley
@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89660 has finished for PR 21081 at commit cd988c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89738 has finished for PR 21081 at commit fee36ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89743 has finished for PR 21081 at commit 3e012fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! After these small fixes, this should be ready, and then we can continue with other algorithms.

new ArrayType(DoubleType, false),
new ArrayType(FloatType, false))
SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
}
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: always put newline between methods

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping: There needs to be a newline between the "}" of the previous method and the "/**" Scaladoc of the next method. Please start checking for this.

* @param colName column name for features
* @return Vector feature column
*/
@Since("2.4.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add Since annotations to private APIs. They can get Since annotations when they are made public.

private[spark] object DatasetUtils {

/**
* preprocessing the input feature column to Vector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit unclear. How about: "Cast a column in a Dataset to a Vector type."
Also, this isn't specific to features, so please clarify that below.
Finally, the key thing to document is the list of supported input types, so I'd add that.

* preprocessing the input feature column to Vector
* @param dataset DataFrame with columns for features
* @param colName column name for features
* @return Vector feature column
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note that this returned Column does not have Metadata

assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false)))
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false)))

val kmeansD = new KMeans().setK(k).setFeaturesCol(featuresColNameD).setSeed(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do: setMaxIter(1) to make this a little faster.

assert(predictDifference.count() == 0)

assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove unnecessary newline

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89736 has finished for PR 21081 at commit 3ffb322.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89749 has finished for PR 21081 at commit c4e1a51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #4157 has finished for PR 21081 at commit c4e1a51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the 1 style comment. Can you please fix it in the follow-up PR? I'll go ahead and merge this with master
LGTM
Thanks @ludatabricks !

new ArrayType(DoubleType, false),
new ArrayType(FloatType, false))
SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
}
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping: There needs to be a newline between the "}" of the previous method and the "/**" Scaladoc of the next method. Please start checking for this.

})
case _: DoubleType => udf((vector: Seq[Double]) => {
Vectors.dense(vector.toArray)
})
case other =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I forgot about this since this was generalized.

@asfgit asfgit closed this in 2a24c48 Apr 24, 2018
@lu-wang-dl lu-wang-dl deleted the SPARK-23975 branch April 25, 2018 18:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants