diff --git a/bigframes/core/bigframe_node.py b/bigframes/core/bigframe_node.py index 9054ab9ba0..b101cb79fc 100644 --- a/bigframes/core/bigframe_node.py +++ b/bigframes/core/bigframe_node.py @@ -20,17 +20,7 @@ import functools import itertools import typing -from typing import ( - Callable, - Dict, - Generator, - Iterable, - Mapping, - Sequence, - Set, - Tuple, - Union, -) +from typing import Callable, Dict, Generator, Iterable, Mapping, Sequence, Tuple, Union from bigframes.core import expression, field, identifiers import bigframes.core.schema as schemata @@ -309,33 +299,31 @@ def unique_nodes( seen.add(item) stack.extend(item.child_nodes) - def edges( + def iter_nodes_topo( self: BigFrameNode, - ) -> Generator[Tuple[BigFrameNode, BigFrameNode], None, None]: - for item in self.unique_nodes(): - for child in item.child_nodes: - yield (item, child) - - def iter_nodes_topo(self: BigFrameNode) -> Generator[BigFrameNode, None, None]: - """Returns nodes from bottom up.""" - queue = collections.deque( - [node for node in self.unique_nodes() if not node.child_nodes] - ) - + ) -> Generator[Tuple[BigFrameNode, Sequence[BigFrameNode]], None, None]: + """Returns nodes in reverse topological order, using Kahn's algorithm.""" child_to_parents: Dict[ - BigFrameNode, Set[BigFrameNode] - ] = collections.defaultdict(set) - for parent, child in self.edges(): - child_to_parents[child].add(parent) - - yielded = set() + BigFrameNode, list[BigFrameNode] + ] = collections.defaultdict(list) + out_degree: Dict[BigFrameNode, int] = collections.defaultdict(int) + + queue: collections.deque["BigFrameNode"] = collections.deque() + for node in list(self.unique_nodes()): + num_children = len(node.child_nodes) + out_degree[node] = num_children + if num_children == 0: + queue.append(node) + for child in node.child_nodes: + child_to_parents[child].append(node) while queue: item = queue.popleft() - yield item - yielded.add(item) - for parent in child_to_parents[item]: - if set(parent.child_nodes).issubset(yielded): + parents = child_to_parents.get(item, []) + yield item, parents + for parent in parents: + out_degree[parent] -= 1 + if out_degree[parent] == 0: queue.append(parent) def top_down( @@ -376,7 +364,7 @@ def bottom_up( Returns the transformed root node. """ results: dict[BigFrameNode, BigFrameNode] = {} - for node in list(self.iter_nodes_topo()): + for node, _ in list(self.iter_nodes_topo()): # child nodes have already been transformed result = node.transform_children(lambda x: results[x]) result = transform(result) @@ -387,7 +375,7 @@ def bottom_up( def reduce_up(self, reduction: Callable[[BigFrameNode, Tuple[T, ...]], T]) -> T: """Apply a bottom-up reduction to the tree.""" results: dict[BigFrameNode, T] = {} - for node in list(self.iter_nodes_topo()): + for node, _ in list(self.iter_nodes_topo()): # child nodes have already been transformed child_results = tuple(results[child] for child in node.child_nodes) result = reduction(node, child_results) diff --git a/bigframes/core/compile/sqlglot/sqlglot_ir.py b/bigframes/core/compile/sqlglot/sqlglot_ir.py index 47dab209d0..cbdd57b691 100644 --- a/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -182,7 +182,7 @@ def from_union( selections = [ sge.Alias( - this=expr.alias_or_name, + this=sge.to_identifier(expr.alias_or_name, quoted=cls.quoted), alias=sge.to_identifier(output_id, quoted=cls.quoted), ) for expr, output_id in zip(select_expr.expressions, output_ids) diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_concat/test_compile_concat/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_concat/test_compile_concat/out.sql index 8da545b8fa..62e22a6a19 100644 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_concat/test_compile_concat/out.sql +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_concat/test_compile_concat/out.sql @@ -49,21 +49,21 @@ WITH `bfcte_1` AS ( * FROM ( SELECT - bfcol_17 AS `bfcol_46`, - bfcol_18 AS `bfcol_47`, - bfcol_19 AS `bfcol_48`, - bfcol_20 AS `bfcol_49`, - bfcol_21 AS `bfcol_50`, - bfcol_22 AS `bfcol_51` + `bfcol_17` AS `bfcol_46`, + `bfcol_18` AS `bfcol_47`, + `bfcol_19` AS `bfcol_48`, + `bfcol_20` AS `bfcol_49`, + `bfcol_21` AS `bfcol_50`, + `bfcol_22` AS `bfcol_51` FROM `bfcte_6` UNION ALL SELECT - bfcol_40 AS `bfcol_46`, - bfcol_41 AS `bfcol_47`, - bfcol_42 AS `bfcol_48`, - bfcol_43 AS `bfcol_49`, - bfcol_44 AS `bfcol_50`, - bfcol_45 AS `bfcol_51` + `bfcol_40` AS `bfcol_46`, + `bfcol_41` AS `bfcol_47`, + `bfcol_42` AS `bfcol_48`, + `bfcol_43` AS `bfcol_49`, + `bfcol_44` AS `bfcol_50`, + `bfcol_45` AS `bfcol_51` FROM `bfcte_7` ) ) diff --git a/tests/unit/core/compile/sqlglot/test_compile_concat.py b/tests/unit/core/compile/sqlglot/test_compile_concat.py index ec7e83a4b0..ba85d6f70f 100644 --- a/tests/unit/core/compile/sqlglot/test_compile_concat.py +++ b/tests/unit/core/compile/sqlglot/test_compile_concat.py @@ -24,8 +24,7 @@ def test_compile_concat( scalars_types_pandas_df: pd.DataFrame, compiler_session: bigframes.Session, snapshot ): - # TODO: concat two same dataframes, which SQL does not get reused. - # TODO: concat dataframes from a gbq table but trigger a windows compiler. + # TODO(b/425739511): concat dataframes from a gbq table but trigger a windows compiler. df1 = bpd.DataFrame(scalars_types_pandas_df, session=compiler_session) df1 = df1[["rowindex", "int64_col", "string_col"]] concat_df = bpd.concat([df1, df1])