Skip to content

Commit 41e7853

Browse files
Erik Selinmateiz
authored andcommitted
[SPARK-1468] Modify the partition function used by partitionBy.
Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. Associated JIRA at https://issues.apache.org/jira/browse/SPARK-1468 Author: Erik Selin <erik.selin@jadedpixel.com> Closes apache#371 from tyro89/consistent_hashing and squashes the following commits: 201c301 [Erik Selin] Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes. (cherry picked from commit 8edc9d0) Signed-off-by: Matei Zaharia <matei@databricks.com>
1 parent e03af41 commit 41e7853

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

python/pyspark/rdd.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ def rightOuterJoin(self, other, numPartitions=None):
926926
return python_right_outer_join(self, other, numPartitions)
927927

928928
# TODO: add option to control map-side combining
929-
def partitionBy(self, numPartitions, partitionFunc=hash):
929+
def partitionBy(self, numPartitions, partitionFunc=None):
930930
"""
931931
Return a copy of the RDD partitioned using the specified partitioner.
932932
@@ -937,6 +937,9 @@ def partitionBy(self, numPartitions, partitionFunc=hash):
937937
"""
938938
if numPartitions is None:
939939
numPartitions = self.ctx.defaultParallelism
940+
941+
if partitionFunc is None:
942+
partitionFunc = lambda x: 0 if x is None else hash(x)
940943
# Transferring O(n) objects to Java is too expensive. Instead, we'll
941944
# form the hash buckets in Python, transferring O(numPartitions) objects
942945
# to Java. Each object is a (splitNumber, [objects]) pair.

0 commit comments

Comments
 (0)