Skip to content

Commit 09d4fab

Browse files
authored
Default to Runner v2 for Python Streaming jobs. (apache#15140)
* Default to Runner v2 for Python Streaming jobs. * Fix test expectations. * yapf
1 parent 61a884e commit 09d4fab

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,9 +594,15 @@ def run_pipeline(self, pipeline, options):
594594
return result
595595

596596
def _maybe_add_unified_worker_missing_options(self, options):
597+
debug_options = options.view_as(DebugOptions)
598+
# Streaming is always portable, default to runner v2.
599+
if options.view_as(StandardOptions).streaming:
600+
if not debug_options.lookup_experiment('disable_runner_v2'):
601+
debug_options.add_experiment('beam_fn_api')
602+
debug_options.add_experiment('use_runner_v2')
603+
debug_options.add_experiment('use_portable_job_submission')
597604
# set default beam_fn_api experiment if use unified
598605
# worker experiment flag exists, no-op otherwise.
599-
debug_options = options.view_as(DebugOptions)
600606
from apache_beam.runners.dataflow.internal import apiclient
601607
if apiclient._use_unified_worker(options):
602608
if not debug_options.lookup_experiment('beam_fn_api'):

sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def test_remote_runner_translation(self):
256256
def test_streaming_create_translation(self):
257257
remote_runner = DataflowRunner()
258258
self.default_properties.append("--streaming")
259+
self.default_properties.append("--experiments=disable_runner_v2")
259260
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
260261
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
261262
job_dict = json.loads(str(remote_runner.job))
@@ -838,15 +839,21 @@ def test_group_into_batches_translation_non_unified_worker(self):
838839
'Runner determined sharding not available in Dataflow for '
839840
'GroupIntoBatches for jobs not using Runner V2'):
840841
_ = self._run_group_into_batches_and_get_step_properties(
841-
True, ['--enable_streaming_engine'])
842+
True,
843+
['--enable_streaming_engine', '--experiments=disable_runner_v2'])
842844

843845
# JRH
844846
with self.assertRaisesRegex(
845847
ValueError,
846848
'Runner determined sharding not available in Dataflow for '
847849
'GroupIntoBatches for jobs not using Runner V2'):
848850
_ = self._run_group_into_batches_and_get_step_properties(
849-
True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
851+
True,
852+
[
853+
'--enable_streaming_engine',
854+
'--experiments=beam_fn_api',
855+
'--experiments=disable_runner_v2'
856+
])
850857

851858
def test_pack_combiners(self):
852859
class PackableCombines(beam.PTransform):

0 commit comments

Comments
 (0)