Skip to content

Commit cf8e08f

Browse files
authored
Merge pull request apache#15340 from zhoufek/gbk_233
2 parents 5563e46 + 706e20f commit cf8e08f

File tree

3 files changed

+18
-20
lines changed

3 files changed

+18
-20
lines changed

CHANGES.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363

6464
## Breaking Changes
6565

66-
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
66+
* Python GBK by defualt will fail on unbounded PCollections that have global windowing and a default trigger. The `--allow_unsafe_triggers` flag can be used to override this. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
67+
* Python GBK will fail if it detects an unsafe trigger unless the `--allow_unsafe_triggers` flag is set. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
6768

6869
## Deprecations
6970

@@ -123,7 +124,8 @@
123124

124125
## Deprecations
125126

126-
* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
127+
* Python GBK will stop supporting unbounded PCollections that have global windowing and a default trigger in Beam 2.33. This can be overriden with `--allow_unsafe_triggers`. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
128+
* Python GBK will start requiring safe triggers or the `--allow_unsafe_triggers` flag starting with Beam 2.33. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
127129

128130
## Known Issues
129131

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,8 +534,7 @@ def _add_argparse_args(cls, parser):
534534
'compatibility. See BEAM-11719.')
535535
parser.add_argument(
536536
'--allow_unsafe_triggers',
537-
# TODO(BEAM-9487): Set to False for Beam 2.33
538-
default=True,
537+
default=False,
539538
action='store_true',
540539
help='Allow the use of unsafe triggers. Unsafe triggers have the '
541540
'potential to cause data loss due to finishing and/or never having '

sdks/python/apache_beam/transforms/core.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2320,10 +2320,10 @@ def expand(self, pcoll):
23202320
if pcoll.pipeline.allow_unsafe_triggers:
23212321
# TODO(BEAM-9487) Change comment for Beam 2.33
23222322
_LOGGER.warning(
2323-
'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
2324-
'global window, and uses a default trigger. This will no longer '
2325-
'be allowed starting with Beam 2.33 unless '
2326-
'--allow_unsafe_triggers is set.',
2323+
'%s: PCollection passed to GroupByKey is unbounded, has a global '
2324+
'window, and uses a default trigger. This is being allowed '
2325+
'because --allow_unsafe_triggers is set, but it may prevent '
2326+
'data from making it through the pipeline.',
23272327
self.label)
23282328
else:
23292329
raise ValueError(
@@ -2332,22 +2332,19 @@ def expand(self, pcoll):
23322332

23332333
unsafe_reason = trigger.may_lose_data(windowing)
23342334
if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
2335+
reason_msg = str(unsafe_reason).replace('DataLossReason.', '')
23352336
if pcoll.pipeline.allow_unsafe_triggers:
2336-
# TODO(BEAM-9487): Switch back to this log for Beam 2.33.
2337-
# _LOGGER.warning(
2338-
# 'Skipping trigger safety check. '
2339-
# 'This could lead to incomplete or missing groups.')
23402337
_LOGGER.warning(
2341-
'%s: Unsafe trigger type (%s) detected. Starting with '
2342-
'Beam 2.33, this will raise an error by default. '
2343-
'Either change the pipeline to use a safe trigger or '
2344-
'set the --allow_unsafe_triggers flag.',
2338+
'%s: Unsafe trigger `%s` detected (reason: %s). This is '
2339+
'being allowed because --allow_unsafe_triggers is set. This could '
2340+
'lead to missing or incomplete groups.',
23452341
self.label,
2346-
unsafe_reason)
2342+
trigger,
2343+
reason_msg)
23472344
else:
2348-
msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
2349-
msg += 'Reason: {}. '.format(
2350-
str(unsafe_reason).replace('DataLossReason.', ''))
2345+
msg = '{}: Unsafe trigger: `{}` may lose data. '.format(
2346+
self.label, trigger)
2347+
msg += 'Reason: {}. '.format(reason_msg)
23512348
msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
23522349
raise ValueError(msg)
23532350

0 commit comments

Comments
 (0)