Skip to content

Commit 6598590

Browse files
viiryamengxr
authored andcommitted
[SPARK-12363][MLLIB][BACKPORT-1.4] Remove setRun and fix PowerIterationClustering failed test
JIRA: https://issues.apache.org/jira/browse/SPARK-12363 ## What changes were proposed in this pull request? Backport JIRA-SPARK-12363 to branch-1.4. ## How was the this patch tested? Unit test. cc mengxr Author: Liang-Chi Hsieh <viirya@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes apache#11264 from viirya/backport-12363.
1 parent c961c27 commit 6598590

File tree

3 files changed

+59
-58
lines changed

3 files changed

+59
-58
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,27 +39,23 @@ import org.apache.spark.{SparkConf, SparkContext}
3939
* n: Number of sampled points on innermost circle.. There are proportionally more points
4040
* within the outer/larger circles
4141
* maxIterations: Number of Power Iterations
42-
* outerRadius: radius of the outermost of the concentric circles
4342
* }}}
4443
*
4544
* Here is a sample run and output:
4645
*
47-
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
48-
*
49-
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
50-
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
46+
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
5147
*
48+
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
49+
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
5250
*
5351
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
5452
*/
5553
object PowerIterationClusteringExample {
5654

5755
case class Params(
58-
input: String = null,
59-
k: Int = 3,
60-
numPoints: Int = 5,
61-
maxIterations: Int = 10,
62-
outerRadius: Double = 3.0
56+
k: Int = 2,
57+
numPoints: Int = 10,
58+
maxIterations: Int = 15
6359
) extends AbstractParams[Params]
6460

6561
def main(args: Array[String]) {
@@ -68,17 +64,14 @@ object PowerIterationClusteringExample {
6864
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
6965
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
7066
opt[Int]('k', "k")
71-
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
67+
.text(s"number of circles (clusters), default: ${defaultParams.k}")
7268
.action((x, c) => c.copy(k = x))
7369
opt[Int]('n', "n")
7470
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
7571
.action((x, c) => c.copy(numPoints = x))
7672
opt[Int]("maxIterations")
7773
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
7874
.action((x, c) => c.copy(maxIterations = x))
79-
opt[Double]('r', "r")
80-
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
81-
.action((x, c) => c.copy(outerRadius = x))
8275
}
8376

8477
parser.parse(args, defaultParams).map { params =>
@@ -96,20 +89,21 @@ object PowerIterationClusteringExample {
9689

9790
Logger.getRootLogger.setLevel(Level.WARN)
9891

99-
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
92+
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
10093
val model = new PowerIterationClustering()
10194
.setK(params.k)
10295
.setMaxIterations(params.maxIterations)
96+
.setInitializationMode("degree")
10397
.run(circlesRdd)
10498

10599
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
106-
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
100+
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
107101
val assignmentsStr = assignments
108102
.map { case (k, v) =>
109103
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
110-
}.mkString(",")
104+
}.mkString(", ")
111105
val sizesStr = assignments.map {
112-
_._2.size
106+
_._2.length
113107
}.sorted.mkString("(", ",", ")")
114108
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
115109

@@ -123,20 +117,17 @@ object PowerIterationClusteringExample {
123117
}
124118
}
125119

126-
def generateCirclesRdd(sc: SparkContext,
127-
nCircles: Int = 3,
128-
nPoints: Int = 30,
129-
outerRadius: Double): RDD[(Long, Long, Double)] = {
130-
131-
val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
132-
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
133-
val points = (0 until nCircles).flatMap { cx =>
134-
generateCircle(radii(cx), groupSizes(cx))
120+
def generateCirclesRdd(
121+
sc: SparkContext,
122+
nCircles: Int,
123+
nPoints: Int): RDD[(Long, Long, Double)] = {
124+
val points = (1 to nCircles).flatMap { i =>
125+
generateCircle(i, i * nPoints)
135126
}.zipWithIndex
136127
val rdd = sc.parallelize(points)
137128
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
138129
if (i0 < i1) {
139-
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
130+
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
140131
} else {
141132
None
142133
}
@@ -147,11 +138,9 @@ object PowerIterationClusteringExample {
147138
/**
148139
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
149140
*/
150-
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
151-
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
152-
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
141+
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
153142
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
154-
coeff * math.exp(expCoeff * ssquares)
143+
math.exp(-ssquares / 2.0)
155144
}
156145
}
157146

mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._
2424
import org.apache.spark.annotation.Experimental
2525
import org.apache.spark.api.java.JavaRDD
2626
import org.apache.spark.graphx._
27-
import org.apache.spark.graphx.impl.GraphImpl
2827
import org.apache.spark.mllib.linalg.Vectors
2928
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
3029
import org.apache.spark.rdd.RDD
@@ -235,10 +234,12 @@ object PowerIterationClustering extends Logging {
235234
},
236235
mergeMsg = _ + _,
237236
TripletFields.EdgeOnly)
238-
GraphImpl.fromExistingRDDs(vD, gA.edges)
237+
Graph(vD, gA.edges)
239238
.mapTriplets(
240239
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
241-
TripletFields.Src)
240+
new TripletFields(/* useSrc */ true,
241+
/* useDst */ false,
242+
/* useEdge */ true))
242243
}
243244

244245
/**
@@ -259,7 +260,7 @@ object PowerIterationClustering extends Logging {
259260
}, preservesPartitioning = true).cache()
260261
val sum = r.values.map(math.abs).sum()
261262
val v0 = r.mapValues(x => x / sum)
262-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
263+
Graph(VertexRDD(v0), g.edges)
263264
}
264265

265266
/**
@@ -274,7 +275,7 @@ object PowerIterationClustering extends Logging {
274275
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
275276
val sum = g.vertices.values.sum()
276277
val v0 = g.vertices.mapValues(_ / sum)
277-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
278+
Graph(VertexRDD(v0), g.edges)
278279
}
279280

280281
/**
@@ -299,7 +300,9 @@ object PowerIterationClustering extends Logging {
299300
val v = curG.aggregateMessages[Double](
300301
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
301302
mergeMsg = _ + _,
302-
TripletFields.Dst).cache()
303+
new TripletFields(/* useSrc */ false,
304+
/* useDst */ true,
305+
/* useEdge */ true)).cache()
303306
// normalize v
304307
val norm = v.values.map(math.abs).sum()
305308
logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -312,7 +315,7 @@ object PowerIterationClustering extends Logging {
312315
diffDelta = math.abs(delta - prevDelta)
313316
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
314317
// update v
315-
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
318+
curG = Graph(VertexRDD(v1), g.edges)
316319
prevDelta = delta
317320
}
318321
curG.vertices
@@ -329,7 +332,6 @@ object PowerIterationClustering extends Logging {
329332
val points = v.mapValues(x => Vectors.dense(x)).cache()
330333
val model = new KMeans()
331334
.setK(k)
332-
.setRuns(5)
333335
.setSeed(0L)
334336
.run(points.values)
335337
points.mapValues(p => model.predict(p)).cache()

mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,42 +30,52 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
3030

3131
import org.apache.spark.mllib.clustering.PowerIterationClustering._
3232

33+
/** Generates a circle of points. */
34+
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
35+
Array.tabulate(n) { i =>
36+
val theta = 2.0 * math.Pi * i / n
37+
(r * math.cos(theta), r * math.sin(theta))
38+
}
39+
}
40+
41+
/** Computes Gaussian similarity. */
42+
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
43+
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
44+
math.exp(-dist2 / 2.0)
45+
}
46+
3347
test("power iteration clustering") {
34-
/*
35-
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
36-
edge (3, 4).
37-
38-
15-14 -13 -12
39-
| |
40-
4 . 3 - 2 11
41-
| | x | |
42-
5 0 - 1 10
43-
| |
44-
6 - 7 - 8 - 9
45-
*/
48+
// Generate two circles following the example in the PIC paper.
49+
val r1 = 1.0
50+
val n1 = 10
51+
val r2 = 4.0
52+
val n2 = 40
53+
val n = n1 + n2
54+
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
55+
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
56+
(i.toLong, j.toLong, sim(points(i), points(j)))
57+
}
4658

47-
val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
48-
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
49-
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
50-
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
5159
val model = new PowerIterationClustering()
5260
.setK(2)
61+
.setMaxIterations(40)
5362
.run(sc.parallelize(similarities, 2))
5463
val predictions = Array.fill(2)(mutable.Set.empty[Long])
5564
model.assignments.collect().foreach { a =>
5665
predictions(a.cluster) += a.id
5766
}
58-
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
67+
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
5968

6069
val model2 = new PowerIterationClustering()
6170
.setK(2)
71+
.setMaxIterations(10)
6272
.setInitializationMode("degree")
6373
.run(sc.parallelize(similarities, 2))
6474
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
6575
model2.assignments.collect().foreach { a =>
6676
predictions2(a.cluster) += a.id
6777
}
68-
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
78+
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
6979
}
7080

7181
test("normalize and powerIter") {

0 commit comments

Comments
 (0)