Skip to content

Commit 0c23dd5

Browse files
zsxwingtdas
authored andcommitted
[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cf) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 9c8e179 commit 0c23dd5

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

python/pyspark/streaming/tests.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,22 @@ def func(dstream):
403403
expected = [[('k', v)] for v in expected]
404404
self._test_func(input, func, expected)
405405

406+
def test_failed_func(self):
407+
input = [self.sc.parallelize([d], 1) for d in range(4)]
408+
input_stream = self.ssc.queueStream(input)
409+
410+
def failed_func(i):
411+
raise ValueError("failed")
412+
413+
input_stream.map(failed_func).pprint()
414+
self.ssc.start()
415+
try:
416+
self.ssc.awaitTerminationOrTimeout(10)
417+
except:
418+
return
419+
420+
self.fail("a failed func should throw an error")
421+
406422

407423
class StreamingListenerTests(PySparkStreamingTestCase):
408424

python/pyspark/streaming/util.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def call(self, milliseconds, jrdds):
6464
return r._jrdd
6565
except Exception:
6666
traceback.print_exc()
67+
raise
6768

6869
def __repr__(self):
6970
return "TransformFunction(%s)" % self.func
@@ -95,13 +96,15 @@ def dumps(self, id):
9596
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
9697
except Exception:
9798
traceback.print_exc()
99+
raise
98100

99101
def loads(self, data):
100102
try:
101103
f, deserializers = self.serializer.loads(bytes(data))
102104
return TransformFunction(self.ctx, f, *deserializers)
103105
except Exception:
104106
traceback.print_exc()
107+
raise
105108

106109
def __repr__(self):
107110
return "TransformFunctionSerializer(%s)" % self.serializer

0 commit comments

Comments
 (0)