Skip to content

Commit 94789f3

Browse files
zsxwingtdas
authored andcommitted
[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cf) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 5118abb commit 94789f3

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

python/pyspark/streaming/tests.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,22 @@ def func(dstream):
376376
expected = [[('k', v)] for v in expected]
377377
self._test_func(input, func, expected)
378378

379+
def test_failed_func(self):
380+
input = [self.sc.parallelize([d], 1) for d in range(4)]
381+
input_stream = self.ssc.queueStream(input)
382+
383+
def failed_func(i):
384+
raise ValueError("failed")
385+
386+
input_stream.map(failed_func).pprint()
387+
self.ssc.start()
388+
try:
389+
self.ssc.awaitTerminationOrTimeout(10)
390+
except:
391+
return
392+
393+
self.fail("a failed func should throw an error")
394+
379395

380396
class WindowFunctionTests(PySparkStreamingTestCase):
381397

python/pyspark/streaming/util.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def call(self, milliseconds, jrdds):
5959
return r._jrdd
6060
except Exception:
6161
traceback.print_exc()
62+
raise
6263

6364
def __repr__(self):
6465
return "TransformFunction(%s)" % self.func
@@ -90,13 +91,15 @@ def dumps(self, id):
9091
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
9192
except Exception:
9293
traceback.print_exc()
94+
raise
9395

9496
def loads(self, data):
9597
try:
9698
f, deserializers = self.serializer.loads(bytes(data))
9799
return TransformFunction(self.ctx, f, *deserializers)
98100
except Exception:
99101
traceback.print_exc()
102+
raise
100103

101104
def __repr__(self):
102105
return "TransformFunctionSerializer(%s)" % self.serializer

0 commit comments

Comments
 (0)