Skip to content

Commit e9ae1fd

Browse files
zsxwingtdas
authored andcommitted
[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cf) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 9a906c1 commit e9ae1fd

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

python/pyspark/streaming/tests.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,22 @@ def func(dstream):
391391
expected = [[('k', v)] for v in expected]
392392
self._test_func(input, func, expected)
393393

394+
def test_failed_func(self):
395+
input = [self.sc.parallelize([d], 1) for d in range(4)]
396+
input_stream = self.ssc.queueStream(input)
397+
398+
def failed_func(i):
399+
raise ValueError("failed")
400+
401+
input_stream.map(failed_func).pprint()
402+
self.ssc.start()
403+
try:
404+
self.ssc.awaitTerminationOrTimeout(10)
405+
except:
406+
return
407+
408+
self.fail("a failed func should throw an error")
409+
394410

395411
class WindowFunctionTests(PySparkStreamingTestCase):
396412

python/pyspark/streaming/util.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def call(self, milliseconds, jrdds):
6464
return r._jrdd
6565
except Exception:
6666
traceback.print_exc()
67+
raise
6768

6869
def __repr__(self):
6970
return "TransformFunction(%s)" % self.func
@@ -95,13 +96,15 @@ def dumps(self, id):
9596
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
9697
except Exception:
9798
traceback.print_exc()
99+
raise
98100

99101
def loads(self, data):
100102
try:
101103
f, deserializers = self.serializer.loads(bytes(data))
102104
return TransformFunction(self.ctx, f, *deserializers)
103105
except Exception:
104106
traceback.print_exc()
107+
raise
105108

106109
def __repr__(self):
107110
return "TransformFunctionSerializer(%s)" % self.serializer

0 commit comments

Comments
 (0)