-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23975][ML]Allow Clustering to take Arrays of Double as input features #21081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I made a first review pass
|
||
assert(kmeans.getK === 2) | ||
assert(kmeans.getFeaturesCol === featuresColName) | ||
assert(kmeans.getPredictionCol === "prediction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check this or the other Params which are not relevant to this test
val arrayUDF = udf { (features: Vector) => | ||
features.toArray | ||
} | ||
val newdataset = dataset.withColumn(featuresColName, arrayUDF(col("features")) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You could drop the original column as well just to make extra sure that it's not being accidentally used.
assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN) | ||
val model = kmeans.setMaxIter(1).fit(newdataset) | ||
|
||
MLTestingUtils.checkCopyAndUids(kmeans, model) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this test here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for hasSummary and copying
val predictUDF = udf((vector: Vector) => predict(vector)) | ||
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) | ||
// val predictUDF = udf((vector: Vector) => predict(vector)) | ||
if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip: This can be more succinct if written as:
val predictUDF = if (dataset.schema(...).dataType.equals(...)) { A } else { B }
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) // so this line is only written once
@@ -312,6 +329,8 @@ class KMeans @Since("1.5.0") ( | |||
val handlePersistence = dataset.storageLevel == StorageLevel.NONE | |||
val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { | |||
case Row(point: Vector) => OldVectors.fromML(point) | |||
case Row(point: Seq[_]) => | |||
OldVectors.fromML(Vectors.dense(point.asInstanceOf[Seq[Double]].toArray)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this will work with arrays of FloatType. Make sure to test it
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) | ||
} else { | ||
val predictUDF = udf((vector: Seq[_]) => | ||
predict(Vectors.dense(vector.asInstanceOf[Seq[Double]].toArray))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not work with arrays of FloatType.
Test build #89410 has finished for PR 21081 at commit
|
Test build #89417 has finished for PR 21081 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates!
// val predictUDF = udf((vector: Vector) => predict(vector)) | ||
val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { | ||
udf((vector: Vector) => predict(vector)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala style: } else {
udf((vector: Vector) => predict(vector)) | ||
} | ||
else { | ||
udf((vector: Seq[_]) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala style: remove unnecessary {
at end of line (IntelliJ should warn you about this)
@@ -90,7 +90,12 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe | |||
* @return output schema | |||
*/ | |||
protected def validateAndTransformSchema(schema: StructType): StructType = { | |||
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) | |||
val typeCandidates = List( new VectorUDT, | |||
new ArrayType(DoubleType, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this, let's actually disallow nullable columns. KMeans won't handle nulls properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, IntelliJ may warn you about passing boolean arguments as named arguments; that'd be nice to fix here.
} | ||
else { | ||
udf((vector: Seq[_]) => { | ||
val featureArray = Array.fill[Double](vector.size)(0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't have to do the conversion in this convoluted (and less efficient) way. I'd recommend doing a match-case statement on dataset.schema; I think that will be the most succinct. Then you can handle Vector, Seq of Float, and Seq of Double separately, without conversions to strings.
Same for the similar cases below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's what I meant:
val predictUDF = featuresDataType match {
case _: VectorUDT =>
udf((vector: Vector) => predict(vector))
case fdt: ArrayType => fdt.elementType match {
case _: FloatType =>
???
case _: DoubleType =>
???
}
}
val predictUDF = udf((vector: Vector) => predict(vector)) | ||
// val predictUDF = udf((vector: Vector) => predict(vector)) | ||
val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { | ||
udf((vector: Vector) => predict(vector)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: I realized that "predict" will cause the whole model to be serialized and sent to workers. But that's actually OK since we do need to send most of the model data to make predictions and since there's not a clean way to just sent the model weights. So I think my previous comment about copying "numClasses" to a local variable was not necessary. Don't bother reverting the change though.
@jkbradley Will this be applied to other algos besides clustering algos ? and how to support sparse float features ? |
I hope we can apply it to other algs too. @ludatabricks is doing some refactoring which should make that easier, but we're not going for a completely general approach right away. I don't think we need to worry about sparse FloatType features; users have no way to pass those in. |
So why not design generic vector class ? and then implement Vector[Double] and Vector[Float] via generic specification ? So it can support everything, no matter sparse and dense. |
Test build #89585 has finished for PR 21081 at commit
|
@WeichenXu123 A generic vector class would be interesting, but that would be a big project, way out of scope of this PR. You could bring it up if that person on the dev list sends a SPIP about linear algebra. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more comments. (Also, remember to clean up the commented-out code.)
@@ -120,11 +123,32 @@ class KMeansModel private[ml] ( | |||
@Since("2.0.0") | |||
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |||
|
|||
@Since("2.4.0") | |||
def featureToVector(dataset: Dataset[_], col: Column): Column = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private. In general, we try to keep APIs as private as possible since that allows us more flexibility to make changes in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, add a Scala docstring saying what this does.
def featureToVector(dataset: Dataset[_], col: Column): Column = { | ||
val featuresDataType = dataset.schema(getFeaturesCol).dataType | ||
val transferUDF = featuresDataType match { | ||
case _: VectorUDT => udf((vector: Vector) => vector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return col(getFeaturesCol)
since that will be more efficient. (Calling a UDF requires data serialization overhead.)
@@ -305,15 +344,45 @@ class KMeans @Since("1.5.0") ( | |||
@Since("1.5.0") | |||
def setSeed(value: Long): this.type = set(seed, value) | |||
|
|||
@Since("2.4.0") | |||
def featureToVector(dataset: Dataset[_], col: Column): Column = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a copy of the same method? It should be shared, either in KMeansParams or in a static (object) method.
@@ -144,8 +168,23 @@ class KMeansModel private[ml] ( | |||
// TODO: Replace the temp fix when we have proper evaluators defined for clustering. | |||
@Since("2.0.0") | |||
def computeCost(dataset: Dataset[_]): Double = { | |||
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) | |||
val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { | |||
val typeCandidates = List( new VectorUDT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reuse validateAndTransformSchema here.
add validateSchema and use it in computeCost addressed the comments from @jkbradley
Test build #89660 has finished for PR 21081 at commit
|
Test build #89738 has finished for PR 21081 at commit
|
Test build #89743 has finished for PR 21081 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! After these small fixes, this should be ready, and then we can continue with other algorithms.
new ArrayType(DoubleType, false), | ||
new ArrayType(FloatType, false)) | ||
SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) | ||
} | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala style: always put newline between methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping: There needs to be a newline between the "}" of the previous method and the "/**" Scaladoc of the next method. Please start checking for this.
* @param colName column name for features | ||
* @return Vector feature column | ||
*/ | ||
@Since("2.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't add Since annotations to private APIs. They can get Since annotations when they are made public.
private[spark] object DatasetUtils { | ||
|
||
/** | ||
* preprocessing the input feature column to Vector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit unclear. How about: "Cast a column in a Dataset to a Vector type."
Also, this isn't specific to features, so please clarify that below.
Finally, the key thing to document is the list of supported input types, so I'd add that.
* preprocessing the input feature column to Vector | ||
* @param dataset DataFrame with columns for features | ||
* @param colName column name for features | ||
* @return Vector feature column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a note that this returned Column does not have Metadata
assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) | ||
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) | ||
|
||
val kmeansD = new KMeans().setK(k).setFeaturesCol(featuresColNameD).setSeed(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also do: setMaxIter(1)
to make this a little faster.
assert(predictDifference.count() == 0) | ||
|
||
assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove unnecessary newline
Test build #89736 has finished for PR 21081 at commit
|
Test build #89749 has finished for PR 21081 at commit
|
Test build #4157 has finished for PR 21081 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the 1 style comment. Can you please fix it in the follow-up PR? I'll go ahead and merge this with master
LGTM
Thanks @ludatabricks !
new ArrayType(DoubleType, false), | ||
new ArrayType(FloatType, false)) | ||
SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) | ||
} | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping: There needs to be a newline between the "}" of the previous method and the "/**" Scaladoc of the next method. Please start checking for this.
}) | ||
case _: DoubleType => udf((vector: Seq[Double]) => { | ||
Vectors.dense(vector.toArray) | ||
}) | ||
case other => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I forgot about this since this was generalized.
What changes were proposed in this pull request?
Multiple possible input types is added in validateAndTransformSchema() and computeCost() while checking column type
Add if statement in transform() to support array type as featuresCol
Add the case statement in fit() while selecting columns from dataset
These changes will be applied to KMeans first, then to other clustering method
How was this patch tested?
unit test is added
Please review http://spark.apache.org/contributing.html before opening a pull request.