Skip to content

Commit 16a9682

Browse files
committed
Fix JavaRDDLike.flatMap(PairFlatMapFunction) (SPARK-668).
This workaround is easier than rewriting JavaRDDLike in Java.
1 parent c2490a2 commit 16a9682

File tree

3 files changed

+51
-4
lines changed

3 files changed

+51
-4
lines changed

core/src/main/scala/spark/api/java/JavaRDDLike.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import scala.collection.JavaConversions._
1212
import java.{util, lang}
1313
import scala.Tuple2
1414

15-
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
15+
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
1616
def wrapRDD(rdd: RDD[T]): This
1717

1818
implicit val classManifest: ClassManifest[T]
@@ -81,10 +81,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
8181
}
8282

8383
/**
84-
* Return a new RDD by first applying a function to all elements of this
85-
* RDD, and then flattening the results.
84+
* Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
8685
*/
87-
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
86+
private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
8887
import scala.collection.JavaConverters._
8988
def fn = (x: T) => f.apply(x).asScala
9089
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package spark.api.java;
2+
3+
import spark.api.java.JavaPairRDD;
4+
import spark.api.java.JavaRDDLike;
5+
import spark.api.java.function.PairFlatMapFunction;
6+
7+
import java.io.Serializable;
8+
9+
/**
10+
* Workaround for SPARK-668.
11+
*/
12+
class PairFlatMapWorkaround<T> implements Serializable {
13+
/**
14+
* Return a new RDD by first applying a function to all elements of this
15+
* RDD, and then flattening the results.
16+
*/
17+
public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
18+
return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
19+
}
20+
}

core/src/test/scala/spark/JavaAPISuite.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,34 @@ public Iterable<Double> call(String s) {
343343
Assert.assertEquals(11, pairs.count());
344344
}
345345

346+
@Test
347+
public void mapsFromPairsToPairs() {
348+
List<Tuple2<Integer, String>> pairs = Arrays.asList(
349+
new Tuple2<Integer, String>(1, "a"),
350+
new Tuple2<Integer, String>(2, "aa"),
351+
new Tuple2<Integer, String>(3, "aaa")
352+
);
353+
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
354+
355+
// Regression test for SPARK-668:
356+
JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
357+
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
358+
@Override
359+
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
360+
return Collections.singletonList(item.swap());
361+
}
362+
});
363+
swapped.collect();
364+
365+
// There was never a bug here, but it's worth testing:
366+
pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
367+
@Override
368+
public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
369+
return item.swap();
370+
}
371+
}).collect();
372+
}
373+
346374
@Test
347375
public void mapPartitions() {
348376
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);

0 commit comments

Comments
 (0)