diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index a9bdb1b7ac..dd98abbdee 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:fac304457974bb530cc5396abd4ab25d26a469cd3bc97cbfb18c8d4324c584eb -# created: 2023-10-02T21:31:03.517640371Z + digest: sha256:08e34975760f002746b1d8c86fdc90660be45945ee6d9db914d1508acdf9a547 +# created: 2023-10-09T14:06:13.397766266Z diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 96d593c8c8..0332d3267e 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -467,9 +467,9 @@ typing-extensions==4.4.0 \ --hash=sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa \ --hash=sha256:16fa4864408f655d35ec496218b85f79b3437c829e93320c7c9215ccfd92489e # via -r requirements.in -urllib3==1.26.12 \ - --hash=sha256:3fa96cf423e6987997fc326ae8df396db2a8b7c667747d47ddd8ecba91f4a74e \ - --hash=sha256:b930dd878d5a8afb066a637fbb35144fe7901e3b209d1cd4f524bd0e9deee997 +urllib3==1.26.17 \ + --hash=sha256:24d6a242c28d29af46c3fae832c36db3bbebcc533dd1bb549172cd739c82df21 \ + --hash=sha256:94a757d178c9be92ef5539b8840d48dc9cf1b2709c9d6b588232a055c524458b # via # requests # twine diff --git a/CHANGELOG.md b/CHANGELOG.md index 880f791625..8d405d06ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,28 @@ [1]: https://pypi.org/project/bigframes/#history +## [0.7.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.6.0...v0.7.0) (2023-10-11) + + +### Features + +* Add aliases for several series properties ([#80](https://github.com/googleapis/python-bigquery-dataframes/issues/80)) ([c0efec8](https://github.com/googleapis/python-bigquery-dataframes/commit/c0efec8956198247b27904345a795f09c80d3502)) +* Add equals methods to series/dataframe ([#76](https://github.com/googleapis/python-bigquery-dataframes/issues/76)) ([636a209](https://github.com/googleapis/python-bigquery-dataframes/commit/636a209e0853501abd50784a11a87cf7f2282ee5)) +* Add iat and iloc accessing by tuples of integers ([#90](https://github.com/googleapis/python-bigquery-dataframes/issues/90)) ([228aeba](https://github.com/googleapis/python-bigquery-dataframes/commit/228aeba09782ae2421040c7601c15d4af92790b6)) +* Add level param to DataFrame.stack ([#88](https://github.com/googleapis/python-bigquery-dataframes/issues/88)) ([97b8bec](https://github.com/googleapis/python-bigquery-dataframes/commit/97b8bec1175499c74448a4fd46b4888c4b4c35c1)) +* Allow df.drop to take an index object ([#68](https://github.com/googleapis/python-bigquery-dataframes/issues/68)) ([740c451](https://github.com/googleapis/python-bigquery-dataframes/commit/740c45176f79d4d2f7f28cb5f6c9eeb1327c8397)) +* Use default session connection ([#87](https://github.com/googleapis/python-bigquery-dataframes/issues/87)) ([4ae4ef9](https://github.com/googleapis/python-bigquery-dataframes/commit/4ae4ef995348b95521c4988a8cfb3b5ac792fd69)) + + +### Bug Fixes + +* Change the invalid url in docs ([#93](https://github.com/googleapis/python-bigquery-dataframes/issues/93)) ([969800d](https://github.com/googleapis/python-bigquery-dataframes/commit/969800d669204de4d0f2e5e61da521217e55668b)) + + +### Documentation + +* Add more preprocessing models into the docs menu. ([#97](https://github.com/googleapis/python-bigquery-dataframes/issues/97)) ([1592315](https://github.com/googleapis/python-bigquery-dataframes/commit/159231505f339173560cd802dae3fed3e63a663b)) + ## [0.6.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.5.0...v0.6.0) (2023-10-04) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index ea1864ed5f..eb56de826a 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -83,12 +83,14 @@ def project(self, value: Optional[str]): @property def bq_connection(self) -> Optional[str]: - """Name of the BigQuery connection to use. + """Name of the BigQuery connection to use. Should be of the form ... You should either have the connection already created in the location you have chosen, or you should have the Project IAM Admin role to enable the service to create the connection for you if you need it. + + If this option isn't provided, or project or location aren't provided, session will use its default project/location/connection_id as default connection. """ return self._bq_connection diff --git a/bigframes/clients.py b/bigframes/clients.py index b60fcba04a..dcac611e8c 100644 --- a/bigframes/clients.py +++ b/bigframes/clients.py @@ -29,6 +29,8 @@ ) logger = logging.getLogger(__name__) +_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection" + class BqConnectionManager: """Manager to handle operations with BQ connections.""" @@ -162,3 +164,25 @@ def _get_service_account_if_connection_exists( pass return service_account + + +def get_connection_name_full( + connection_name: Optional[str], default_project: str, default_location: str +) -> str: + """Retrieve the full connection name of the form ... + Use default project, location or connection_id when any of them are missing.""" + if connection_name is None: + return ( + f"{default_project}.{default_location}.{_BIGFRAMES_DEFAULT_CONNECTION_ID}" + ) + + if connection_name.count(".") == 2: + return connection_name + + if connection_name.count(".") == 1: + return f"{default_project}.{connection_name}" + + if connection_name.count(".") == 0: + return f"{default_project}.{default_location}.{connection_name}" + + raise ValueError(f"Invalid connection name format: {connection_name}.") diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 8008c1189a..ccfd682215 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -16,8 +16,9 @@ from dataclasses import dataclass import functools import math +import textwrap import typing -from typing import Collection, Dict, Iterable, Literal, Optional, Sequence, Tuple +from typing import Collection, Iterable, Literal, Optional, Sequence, Tuple from google.cloud import bigquery import ibis @@ -201,31 +202,27 @@ def mem_expr_from_pandas( hidden_ordering_columns=(keys_memtable[ORDER_ID_COLUMN],), ) - @property - def table(self) -> ibis_types.Table: - return self._table - - @property - def reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: - """Returns the frame's predicates as an equivalent boolean value, useful where a single predicate value is preferred.""" - return ( - _reduce_predicate_list(self._predicates).name(PREDICATE_COLUMN) - if self._predicates - else None - ) - @property def columns(self) -> typing.Tuple[ibis_types.Value, ...]: return self._columns @property - def column_names(self) -> Dict[str, ibis_types.Value]: - return self._column_names + def column_ids(self) -> typing.Sequence[str]: + return tuple(self._column_names.keys()) @property def hidden_ordering_columns(self) -> typing.Tuple[ibis_types.Value, ...]: return self._hidden_ordering_columns + @property + def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: + """Returns the frame's predicates as an equivalent boolean value, useful where a single predicate value is preferred.""" + return ( + _reduce_predicate_list(self._predicates).name(PREDICATE_COLUMN) + if self._predicates + else None + ) + @property def _ibis_order(self) -> Sequence[ibis_types.Value]: """Returns a sequence of ibis values which can be directly used to order a table expression. Has direction modifiers applied.""" @@ -265,24 +262,22 @@ def drop_columns(self, columns: Iterable[str]) -> ArrayValue: def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: ibis_type = typing.cast( - bigframes.dtypes.IbisDtype, self.get_any_column(key).type() + bigframes.dtypes.IbisDtype, self._get_any_column(key).type() ) return typing.cast( bigframes.dtypes.Dtype, bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), ) - def get_column(self, key: str) -> ibis_types.Value: + def _get_ibis_column(self, key: str) -> ibis_types.Value: """Gets the Ibis expression for a given column.""" - if key not in self._column_names.keys(): + if key not in self.column_ids: raise ValueError( - "Column name {} not in set of values: {}".format( - key, self._column_names.keys() - ) + "Column name {} not in set of values: {}".format(key, self.column_ids) ) return typing.cast(ibis_types.Value, self._column_names[key]) - def get_any_column(self, key: str) -> ibis_types.Value: + def _get_any_column(self, key: str) -> ibis_types.Value: """Gets the Ibis expression for a given column. Will also get hidden columns.""" all_columns = {**self._column_names, **self._hidden_ordering_column_names} if key not in all_columns.keys(): @@ -303,26 +298,11 @@ def _get_hidden_ordering_column(self, key: str) -> ibis_types.Column: ) return typing.cast(ibis_types.Column, self._hidden_ordering_column_names[key]) - def apply_limit(self, max_results: int) -> ArrayValue: - table = self._to_ibis_expr( - ordering_mode="order_by", - expose_hidden_cols=True, - ).limit(max_results) - columns = [table[column_name] for column_name in self._column_names] - hidden_ordering_columns = [ - table[column_name] for column_name in self._hidden_ordering_column_names - ] - return ArrayValue( - self._session, - table, - columns=columns, - hidden_ordering_columns=hidden_ordering_columns, - ordering=self._ordering, - ) - def filter(self, predicate_id: str, keep_null: bool = False) -> ArrayValue: """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" - condition = typing.cast(ibis_types.BooleanValue, self.get_column(predicate_id)) + condition = typing.cast( + ibis_types.BooleanValue, self._get_ibis_column(predicate_id) + ) if keep_null: condition = typing.cast( ibis_types.BooleanValue, @@ -358,7 +338,7 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue: The row numbers of result is non-deterministic, avoid to use. """ table = self._to_ibis_expr( - ordering_mode="order_by", expose_hidden_cols=True, fraction=fraction + "unordered", expose_hidden_cols=True, fraction=fraction ) columns = [table[column_name] for column_name in self._column_names] hidden_ordering_columns = [ @@ -373,7 +353,7 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue: ) @property - def offsets(self): + def _offsets(self) -> ibis_types.IntegerColumn: if not self._ordering.is_sequential: raise ValueError( "Expression does not have offsets. Generate them first using project_offsets." @@ -382,9 +362,10 @@ def offsets(self): raise ValueError( "Ordering is invalid. Marked as sequential but no total order columns." ) - return self.get_any_column(self._ordering.total_order_col.column_id) + column = self._get_any_column(self._ordering.total_order_col.column_id) + return typing.cast(ibis_types.IntegerColumn, column) - def project_offsets(self) -> ArrayValue: + def _project_offsets(self) -> ArrayValue: """Create a new expression that contains offsets. Should only be executed when offsets are needed for an operations. Has no effect on expression semantics.""" if self._ordering.is_sequential: return self @@ -414,7 +395,7 @@ def _hide_column(self, column_id) -> ArrayValue: new_name = bigframes.core.guid.generate_guid(prefix="bigframes_hidden_") expr_builder.hidden_ordering_columns = [ *self._hidden_ordering_columns, - self.get_column(column_id).name(new_name), + self._get_ibis_column(column_id).name(new_name), ] expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name}) return expr_builder.build() @@ -427,26 +408,28 @@ def promote_offsets(self) -> typing.Tuple[ArrayValue, str]: ordering = self._ordering if (not ordering.is_sequential) or (not ordering.total_order_col): - return self.project_offsets().promote_offsets() + return self._project_offsets().promote_offsets() col_id = bigframes.core.guid.generate_guid() expr_builder = self.builder() expr_builder.columns = [ - self.get_any_column(ordering.total_order_col.column_id).name(col_id), + self._get_any_column(ordering.total_order_col.column_id).name(col_id), *self.columns, ] return expr_builder.build(), col_id def select_columns(self, column_ids: typing.Sequence[str]): - return self.projection([self.get_column(col_id) for col_id in column_ids]) + return self._projection( + [self._get_ibis_column(col_id) for col_id in column_ids] + ) - def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: + def _projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: """Creates a new expression based on this expression with new columns.""" # TODO(swast): We might want to do validation here that columns derive # from the same table expression instead of (in addition to?) at # construction time. expr = self - for ordering_column in set(self.column_names.keys()).intersection( + for ordering_column in set(self.column_ids).intersection( [col_ref.column_id for col_ref in self._ordering.ordering_value_columns] ): # Need to hide ordering columns that are being dropped. Alternatively, could project offsets @@ -459,7 +442,7 @@ def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: def shape(self) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" width = len(self.columns) - count_expr = self._to_ibis_expr(ordering_mode="unordered").count() + count_expr = self._to_ibis_expr("unordered").count() sql = self._session.ibis_client.compile(count_expr) # Support in-memory engines for hermetic unit tests. @@ -527,7 +510,7 @@ def project_unary_op( self, column_name: str, op: ops.UnaryOp, output_name=None ) -> ArrayValue: """Creates a new expression based on this expression with unary operation applied to one column.""" - value = op._as_ibis(self.get_column(column_name)).name( + value = op._as_ibis(self._get_ibis_column(column_name)).name( output_name or column_name ) return self._set_or_replace_by_id(output_name or column_name, value) @@ -541,7 +524,8 @@ def project_binary_op( ) -> ArrayValue: """Creates a new expression based on this expression with binary operation applied to two columns.""" value = op( - self.get_column(left_column_id), self.get_column(right_column_id) + self._get_ibis_column(left_column_id), + self._get_ibis_column(right_column_id), ).name(output_column_id) return self._set_or_replace_by_id(output_column_id, value) @@ -555,9 +539,9 @@ def project_ternary_op( ) -> ArrayValue: """Creates a new expression based on this expression with ternary operation applied to three columns.""" value = op( - self.get_column(col_id_1), - self.get_column(col_id_2), - self.get_column(col_id_3), + self._get_ibis_column(col_id_1), + self._get_ibis_column(col_id_2), + self._get_ibis_column(col_id_3), ).name(output_column_id) return self._set_or_replace_by_id(output_column_id, value) @@ -574,7 +558,7 @@ def aggregate( by_column_id: column id of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped """ - table = self._to_ibis_expr(ordering_mode="unordered") + table = self._to_ibis_expr("unordered") stats = { col_out: agg_op._as_ibis(table[col_in]) for col_in, agg_op, col_out in aggregations @@ -594,10 +578,10 @@ def aggregate( if dropna: for column_id in by_column_ids: expr = expr._filter( - ops.notnull_op._as_ibis(expr.get_column(column_id)) + ops.notnull_op._as_ibis(expr._get_ibis_column(column_id)) ) # Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation - return expr.project_offsets() + return expr._project_offsets() else: aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} result = table.aggregate(**aggregates) @@ -624,7 +608,7 @@ def corr_aggregate( Arguments: corr_aggregations: left_column_id, right_column_id, output_column_id tuples """ - table = self._to_ibis_expr(ordering_mode="unordered") + table = self._to_ibis_expr("unordered") stats = { col_out: table[col_left].corr(table[col_right], how="pop") for col_left, col_right, col_out in corr_aggregations @@ -664,7 +648,7 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ - column = typing.cast(ibis_types.Column, self.get_column(column_name)) + column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) window = self._ibis_window_from_spec(window_spec, allow_ties=op.handles_ties) window_op = op._as_ibis(column, window) @@ -700,26 +684,34 @@ def project_window_op( def to_sql( self, - ordering_mode: Literal[ - "order_by", "string_encoded", "offset_col", "unordered" - ] = "order_by", - order_col_name: Optional[str] = ORDER_ID_COLUMN, + offset_column: typing.Optional[str] = None, col_id_overrides: typing.Mapping[str, str] = {}, + sorted: bool = False, ) -> str: + offsets_id = offset_column or ORDER_ID_COLUMN + sql = self._session.ibis_client.compile( self._to_ibis_expr( - ordering_mode=ordering_mode, - order_col_name=order_col_name, + ordering_mode="offset_col" + if (offset_column or sorted) + else "unordered", + order_col_name=offsets_id, col_id_overrides=col_id_overrides, ) ) + if sorted: + sql = textwrap.dedent( + f""" + SELECT * EXCEPT (`{offsets_id}`) + FROM ({sql}) + ORDER BY `{offsets_id}` + """ + ) return typing.cast(str, sql) def _to_ibis_expr( self, - ordering_mode: Literal[ - "order_by", "string_encoded", "offset_col", "unordered" - ] = "order_by", + ordering_mode: Literal["string_encoded", "offset_col", "unordered"], order_col_name: Optional[str] = ORDER_ID_COLUMN, expose_hidden_cols: bool = False, fraction: Optional[float] = None, @@ -731,8 +723,6 @@ def _to_ibis_expr( ArrayValue objects are sorted, so the following options are available to reflect this in the ibis expression. - * "order_by" (Default): The output table will not have an ordering - column, however there will be an order_by clause applied to the ouput. * "offset_col": Zero-based offsets are generated as a column, this will not sort the rows however. * "string_encoded": An ordered string column is provided in output table. @@ -760,7 +750,6 @@ def _to_ibis_expr( An ibis expression representing the data help by the ArrayValue object. """ assert ordering_mode in ( - "order_by", "string_encoded", "offset_col", "unordered", @@ -775,18 +764,16 @@ def _to_ibis_expr( str ] = [] # Ordering/Filtering columns that will be dropped at end - if self.reduced_predicate is not None: - columns.append(self.reduced_predicate) + if self._reduced_predicate is not None: + columns.append(self._reduced_predicate) # Usually drop predicate as it is will be all TRUE after filtering if not expose_hidden_cols: - columns_to_drop.append(self.reduced_predicate.get_name()) + columns_to_drop.append(self._reduced_predicate.get_name()) order_columns = self._create_order_columns( ordering_mode, order_col_name, expose_hidden_cols ) columns.extend(order_columns) - if (ordering_mode == "order_by") and not expose_hidden_cols: - columns_to_drop.extend(col.get_name() for col in order_columns) # Special case for empty tables, since we can't create an empty # projection. @@ -799,15 +786,8 @@ def _to_ibis_expr( bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns ) base_table = table - if self.reduced_predicate is not None: + if self._reduced_predicate is not None: table = table.filter(base_table[PREDICATE_COLUMN]) - if ordering_mode == "order_by": - table = table.order_by( - _convert_ordering_to_table_values( - {col: base_table[col] for col in table.columns}, - self._ordering.all_ordering_columns, - ) # type: ignore - ) table = table.drop(*columns_to_drop) if col_id_overrides: table = table.relabel(col_id_overrides) @@ -826,24 +806,24 @@ def _create_order_columns( return (self._create_offset_column().name(order_col_name),) elif ordering_mode == "string_encoded": return (self._create_string_ordering_column().name(order_col_name),) - elif ordering_mode == "order_by" or expose_hidden_cols: + elif expose_hidden_cols: return self.hidden_ordering_columns return () def _create_offset_column(self) -> ibis_types.IntegerColumn: if self._ordering.total_order_col and self._ordering.is_sequential: - offsets = self.get_any_column(self._ordering.total_order_col.column_id) + offsets = self._get_any_column(self._ordering.total_order_col.column_id) return typing.cast(ibis_types.IntegerColumn, offsets) else: window = ibis.window(order_by=self._ibis_order) if self._predicates: - window = window.group_by(self.reduced_predicate) + window = window.group_by(self._reduced_predicate) offsets = ibis.row_number().over(window) return typing.cast(ibis_types.IntegerColumn, offsets) def _create_string_ordering_column(self) -> ibis_types.StringColumn: if self._ordering.total_order_col and self._ordering.is_string_encoded: - string_order_ids = self.get_any_column( + string_order_ids = self._get_any_column( self._ordering.total_order_col.column_id ) return typing.cast(ibis_types.StringColumn, string_order_ids) @@ -852,7 +832,7 @@ def _create_string_ordering_column(self) -> ibis_types.StringColumn: and self._ordering.integer_encoding.is_encoded ): # Special case: non-negative integer ordering id can be converted directly to string without regenerating row numbers - int_values = self.get_any_column(self._ordering.total_order_col.column_id) + int_values = self._get_any_column(self._ordering.total_order_col.column_id) return encode_order_string( typing.cast(ibis_types.IntegerColumn, int_values), ) @@ -860,7 +840,7 @@ def _create_string_ordering_column(self) -> ibis_types.StringColumn: # Have to build string from scratch window = ibis.window(order_by=self._ibis_order) if self._predicates: - window = window.group_by(self.reduced_predicate) + window = window.group_by(self._reduced_predicate) row_nums = typing.cast( ibis_types.IntegerColumn, ibis.row_number().over(window) ) @@ -870,7 +850,8 @@ def start_query( self, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, - expose_extra_columns: bool = False, + *, + sorted: bool = True, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """Execute a query and return metadata about the results.""" # TODO(swast): Cache the job ID so we can look it up again if they ask @@ -883,8 +864,7 @@ def start_query( # a LocalSession for unit testing. # TODO(swast): Add a timeout here? If the query is taking a long time, # maybe we just print the job metadata that we have so far? - table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns) - sql = self._session.ibis_client.compile(table) # type:ignore + sql = self.to_sql(sorted=True) # type:ignore return self._session._start_query( sql=sql, job_config=job_config, @@ -903,7 +883,7 @@ def _reproject_to_table(self) -> ArrayValue: recursively in projections. """ table = self._to_ibis_expr( - ordering_mode="unordered", + "unordered", expose_hidden_cols=True, ) columns = [table[column_name] for column_name in self._column_names] @@ -926,14 +906,16 @@ def _reproject_to_table(self) -> ArrayValue: def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = False): group_by: typing.List[ibis_types.Value] = ( [ - typing.cast(ibis_types.Column, _as_identity(self.get_column(column))) + typing.cast( + ibis_types.Column, _as_identity(self._get_ibis_column(column)) + ) for column in window_spec.grouping_keys ] if window_spec.grouping_keys else [] ) - if self.reduced_predicate is not None: - group_by.append(self.reduced_predicate) + if self._reduced_predicate is not None: + group_by.append(self._reduced_predicate) if window_spec.ordering: order_by = _convert_ordering_to_table_values( {**self._column_names, **self._hidden_ordering_column_names}, @@ -984,7 +966,7 @@ def unpivot( """ if how not in ("left", "right"): raise ValueError("'how' must be 'left' or 'right'") - table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) + table = self._to_ibis_expr("unordered", expose_hidden_cols=True) row_n = len(row_labels) hidden_col_ids = self._hidden_ordering_column_names.keys() if not all( @@ -1107,7 +1089,9 @@ def unpivot( ) def assign(self, source_id: str, destination_id: str) -> ArrayValue: - return self._set_or_replace_by_id(destination_id, self.get_column(source_id)) + return self._set_or_replace_by_id( + destination_id, self._get_ibis_column(source_id) + ) def assign_constant( self, @@ -1134,74 +1118,25 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> ArrayVa return self._hide_column(id)._set_or_replace_by_id(id, new_value) builder = self.builder() - if id in self.column_names: + if id in self.column_ids: builder.columns = [ val if (col_id != id) else new_value.name(id) - for col_id, val in self.column_names.items() + for col_id, val in zip(self.column_ids, self._columns) ] else: builder.columns = [*self.columns, new_value.name(id)] return builder.build() - def slice( - self, - start: typing.Optional[int] = None, - stop: typing.Optional[int] = None, - step: typing.Optional[int] = None, - ) -> ArrayValue: - if step == 0: - raise ValueError("slice step cannot be zero") - - if not step: - step = 1 - - expr_with_offsets = self.project_offsets() - - # start with True and reduce with start, stop, and step conditions - cond_list = [expr_with_offsets.offsets == expr_with_offsets.offsets] - - last_offset = expr_with_offsets.offsets.max() - - # Convert negative indexes to positive indexes - if start and start < 0: - start = last_offset + start + 1 - if stop and stop < 0: - stop = last_offset + stop + 1 - - if start is not None: - if step >= 1: - cond_list.append(expr_with_offsets.offsets >= start) - else: - cond_list.append(expr_with_offsets.offsets <= start) - if stop is not None: - if step >= 1: - cond_list.append(expr_with_offsets.offsets < stop) - else: - cond_list.append(expr_with_offsets.offsets > stop) - if step > 1: - start = start if (start is not None) else 0 - cond_list.append((expr_with_offsets.offsets - start) % step == 0) - if step < 0: - start = start if (start is not None) else last_offset - cond_list.append((start - expr_with_offsets.offsets) % (-step) == 0) - - sliced_expr = expr_with_offsets._filter( - functools.reduce(lambda x, y: x & y, cond_list) - ) - return sliced_expr if step > 0 else sliced_expr.reversed() - def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: """Write the ArrayValue to a session table and create a new block object that references it.""" - ibis_expr = self._to_ibis_expr( - ordering_mode="unordered", expose_hidden_cols=True - ) + ibis_expr = self._to_ibis_expr("unordered", expose_hidden_cols=True) destination = self._session._ibis_to_session_table( ibis_expr, cluster_cols=cluster_cols, api_name="cache" ) table_expression = self._session.ibis_client.table( f"{destination.project}.{destination.dataset_id}.{destination.table_id}" ) - new_columns = [table_expression[column] for column in self.column_names] + new_columns = [table_expression[column] for column in self.column_ids] new_hidden_columns = [ table_expression[column] for column in self._hidden_ordering_column_names ] diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 30c7902981..904da7f312 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -25,6 +25,39 @@ import bigframes.operations.aggregations as agg_ops +def equals(block1: blocks.Block, block2: blocks.Block) -> bool: + if not block1.column_labels.equals(block2.column_labels): + return False + if block1.dtypes != block2.dtypes: + return False + # TODO: More advanced expression tree traversals to short circuit actually querying data + + block1 = block1.reset_index(drop=False) + block2 = block2.reset_index(drop=False) + + joined, (lmap, rmap) = block1.index.join(block2.index, how="outer") + joined_block = joined._block + + equality_ids = [] + for lcol, rcol in zip(block1.value_columns, block2.value_columns): + lcolmapped = lmap(lcol) + rcolmapped = rmap(rcol) + joined_block, result_id = joined_block.apply_binary_op( + lcolmapped, rcolmapped, ops.eq_nulls_match_op + ) + joined_block, result_id = joined_block.apply_unary_op( + result_id, ops.partial_right(ops.fillna_op, False) + ) + equality_ids.append(result_id) + + joined_block = joined_block.select_columns(equality_ids).with_column_labels( + list(range(len(equality_ids))) + ) + stacked_block = joined_block.stack() + result = stacked_block.get_stat(stacked_block.value_columns[0], agg_ops.all_op) + return typing.cast(bool, result) + + def indicate_duplicates( block: blocks.Block, columns: typing.Sequence[str], keep: str = "first" ) -> typing.Tuple[blocks.Block, str]: diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0161d17361..9b49645c71 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -152,7 +152,7 @@ def value_columns(self) -> Sequence[str]: """All value columns, mutually exclusive with index columns.""" return [ column - for column in self._expr.column_names + for column in self._expr.column_ids if column not in self.index_columns ] @@ -444,9 +444,7 @@ def _compute_and_count( # TODO(swast): Allow for dry run and timeout. expr = self._apply_value_keys_to_expr(value_keys=value_keys) - results_iterator, query_job = expr.start_query( - max_results=max_results, expose_extra_columns=True - ) + results_iterator, query_job = expr.start_query(max_results=max_results) table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES fraction = ( @@ -483,12 +481,6 @@ def _compute_and_count( if self.index_columns: df.set_index(list(self.index_columns), inplace=True) df.index.names = self.index.names # type: ignore - - df.drop( - [col for col in df.columns if col not in self.value_columns], - axis=1, - inplace=True, - ) elif (sampling_method == _UNIFORM) and (random_state is None): filtered_expr = self.expr._uniform_sampling(fraction) block = Block( @@ -520,12 +512,6 @@ def _compute_and_count( df.set_index(list(self.index_columns), inplace=True) df.index.names = self.index.names # type: ignore - df.drop( - [col for col in df.columns if col not in self.value_columns], - axis=1, - inplace=True, - ) - return df, total_rows, query_job def _split( @@ -1087,7 +1073,7 @@ def _normalize_expression( ): """Normalizes expression by moving index columns to left.""" value_columns = [ - col_id for col_id in expr.column_names.keys() if col_id not in index_columns + col_id for col_id in expr.column_ids if col_id not in index_columns ] if (assert_value_size is not None) and ( len(value_columns) != assert_value_size @@ -1096,20 +1082,92 @@ def _normalize_expression( return expr.select_columns([*index_columns, *value_columns]) def slice( - self: bigframes.core.blocks.Block, + self, start: typing.Optional[int] = None, stop: typing.Optional[int] = None, step: typing.Optional[int] = None, ) -> bigframes.core.blocks.Block: - sliced_expr = self.expr.slice(start=start, stop=stop, step=step) - # since this is slice, return a copy even if unchanged - block = Block( - sliced_expr, - index_columns=self.index_columns, - column_labels=self.column_labels, - index_labels=self._index_labels, + if step is None: + step = 1 + if step == 0: + raise ValueError("slice step cannot be zero") + if step < 0: + reverse_start = (-start - 1) if start else 0 + reverse_stop = (-stop - 1) if stop else None + reverse_step = -step + return self.reversed()._forward_slice( + reverse_start, reverse_stop, reverse_step + ) + return self._forward_slice(start or 0, stop, step) + + def _forward_slice(self, start: int = 0, stop=None, step: int = 1): + """Performs slice but only for positive step size.""" + if step <= 0: + raise ValueError("forward_slice only supports positive step size") + + use_postive_offsets = ( + (start > 0) + or ((stop is not None) and (stop >= 0)) + or ((step > 1) and (start >= 0)) ) - return block + use_negative_offsets = ( + (start < 0) or (stop and (stop < 0)) or ((step > 1) and (start < 0)) + ) + + block = self + + # only generate offsets that are used + positive_offsets = None + negative_offsets = None + + if use_postive_offsets: + block, positive_offsets = self.promote_offsets() + if use_negative_offsets: + block, negative_offsets = block.reversed().promote_offsets() + block = block.reversed() + + conditions = [] + if start != 0: + if start > 0: + op = ops.partial_right(ops.ge_op, start) + assert positive_offsets + block, start_cond = block.apply_unary_op(positive_offsets, op) + else: + op = ops.partial_right(ops.le_op, -start - 1) + assert negative_offsets + block, start_cond = block.apply_unary_op(negative_offsets, op) + conditions.append(start_cond) + if stop is not None: + if stop >= 0: + op = ops.partial_right(ops.lt_op, stop) + assert positive_offsets + block, stop_cond = block.apply_unary_op(positive_offsets, op) + else: + op = ops.partial_right(ops.gt_op, -stop - 1) + assert negative_offsets + block, stop_cond = block.apply_unary_op(negative_offsets, op) + conditions.append(stop_cond) + + if step > 1: + op = ops.partial_right(ops.mod_op, step) + if start >= 0: + op = ops.partial_right(ops.sub_op, start) + assert positive_offsets + block, start_diff = block.apply_unary_op(positive_offsets, op) + else: + op = ops.partial_right(ops.sub_op, -start + 1) + assert negative_offsets + block, start_diff = block.apply_unary_op(negative_offsets, op) + modulo_op = ops.partial_right(ops.mod_op, step) + block, mod = block.apply_unary_op(start_diff, modulo_op) + is_zero_op = ops.partial_right(ops.eq_op, 0) + block, step_cond = block.apply_unary_op(mod, is_zero_op) + conditions.append(step_cond) + + for cond in conditions: + block = block.filter(cond) + + return block.select_columns(self.value_columns) # Using cache to optimize for Jupyter Notebook's behavior where both '__repr__' # and '__repr_html__' are called in a single display action, reducing redundant @@ -1226,20 +1284,20 @@ def pivot( return result_block.with_column_labels(column_index) - def stack(self, how="left", dropna=True, sort=True, levels: int = 1): + def stack(self, how="left", levels: int = 1): """Unpivot last column axis level into row axis""" + if levels == 0: + return self + # These are the values that will be turned into rows col_labels, row_labels = utils.split_index(self.column_labels, levels=levels) - if dropna: - row_labels = row_labels.drop_duplicates() - if sort: - row_labels = row_labels.sort_values() + row_labels = row_labels.drop_duplicates() row_label_tuples = utils.index_as_tuples(row_labels) if col_labels is not None: - result_index = col_labels.drop_duplicates().sort_values().dropna(how="all") + result_index = col_labels.drop_duplicates().dropna(how="all") result_col_labels = utils.index_as_tuples(result_index) else: result_index = pd.Index([None]) @@ -1396,7 +1454,7 @@ def concat( ) result_block = Block( result_expr, - index_columns=list(result_expr.column_names.keys())[:index_nlevels], + index_columns=list(result_expr.column_ids)[:index_nlevels], column_labels=aligned_blocks[0].column_labels, index_labels=result_labels, ) @@ -1530,9 +1588,7 @@ def to_sql_query( # the BigQuery unicode column name feature? substitutions[old_id] = new_id - sql = array_value.to_sql( - ordering_mode="unordered", col_id_overrides=substitutions - ) + sql = array_value.to_sql(col_id_overrides=substitutions) return ( sql, new_ids[: len(idx_labels)], diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9be7f22a71..db0843fcbc 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -426,10 +426,6 @@ def __init__( self._value_name = value_name self._dropna = dropna # Applies to aggregations but not windowing - @property - def _value(self): - return self._block.expr.get_column(self._value_column) - def all(self) -> series.Series: return self._aggregate(agg_ops.all_op) diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index 1a88b2abd6..a74880041c 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -97,6 +97,16 @@ def __getitem__( return _iloc_getitem_series_or_dataframe(self._series, key) +class IatSeriesIndexer: + def __init__(self, series: bigframes.series.Series): + self._series = series + + def __getitem__(self, key: int) -> bigframes.core.scalar.Scalar: + if not isinstance(key, int): + raise ValueError("Series iAt based indexing can only have integer indexers") + return self._series.iloc[key] + + class LocDataFrameIndexer: def __init__(self, dataframe: bigframes.dataframe.DataFrame): self._dataframe = dataframe @@ -188,6 +198,28 @@ def __getitem__(self, key) -> Union[bigframes.dataframe.DataFrame, pd.Series]: return _iloc_getitem_series_or_dataframe(self._dataframe, key) +class IatDataFrameIndexer: + def __init__(self, dataframe: bigframes.dataframe.DataFrame): + self._dataframe = dataframe + + def __getitem__(self, key: tuple) -> bigframes.core.scalar.Scalar: + error_message = "DataFrame.iat should be indexed by a tuple of exactly 2 ints" + # we raise TypeError or ValueError under the same conditions that pandas does + if isinstance(key, int): + raise TypeError(error_message) + if not isinstance(key, tuple): + raise ValueError(error_message) + key_values_are_ints = [isinstance(key_value, int) for key_value in key] + if not all(key_values_are_ints): + raise ValueError(error_message) + if len(key) != 2: + raise TypeError(error_message) + block = self._dataframe._block + column_block = block.select_columns([block.value_columns[key[1]]]) + column = bigframes.series.Series(column_block) + return column.iloc[key[0]] + + @typing.overload def _loc_getitem_series_or_dataframe( series_or_dataframe: bigframes.series.Series, key @@ -356,6 +388,18 @@ def _iloc_getitem_series_or_dataframe( return result_pd_df.iloc[0] elif isinstance(key, slice): return series_or_dataframe._slice(key.start, key.stop, key.step) + elif isinstance(key, tuple) and len(key) == 0: + return series_or_dataframe + elif isinstance(key, tuple) and len(key) == 1: + return _iloc_getitem_series_or_dataframe(series_or_dataframe, key[0]) + elif ( + isinstance(key, tuple) + and isinstance(series_or_dataframe, bigframes.dataframe.DataFrame) + and len(key) == 2 + ): + return series_or_dataframe.iat[key] + elif isinstance(key, tuple): + raise pd.errors.IndexingError("Too many indexers") elif pd.api.types.is_list_like(key): if len(key) == 0: return typing.cast( diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index f211afe4d5..7d15e67649 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -398,9 +398,7 @@ def to_pandas(self) -> pandas.Index: """Executes deferred operations and downloads the results.""" # Project down to only the index column. So the query can be cached to visualize other data. index_columns = list(self._block.index_columns) - expr = self._expr.projection( - [self._expr.get_any_column(col) for col in index_columns] - ) + expr = self._expr.select_columns(index_columns) results, _ = expr.start_query() df = expr._session._rows_to_dataframe(results) df = df.set_index(index_columns) diff --git a/bigframes/core/joins/row_identity.py b/bigframes/core/joins/row_identity.py index 66eb223990..156e7aef40 100644 --- a/bigframes/core/joins/row_identity.py +++ b/bigframes/core/joins/row_identity.py @@ -38,11 +38,11 @@ def join_by_row_identity( f"Only how='outer','left','inner' currently supported. {constants.FEEDBACK_LINK}" ) - if not left.table.equals(right.table): + if not left._table.equals(right._table): raise ValueError( "Cannot combine objects without an explicit join/merge key. " - f"Left based on: {left.table.compile()}, but " - f"right based on: {right.table.compile()}" + f"Left based on: {left._table.compile()}, but " + f"right based on: {right._table.compile()}" ) left_predicates = left._predicates @@ -63,11 +63,11 @@ def join_by_row_identity( left_mask = left_relative_predicates if how in ["right", "outer"] else None right_mask = right_relative_predicates if how in ["left", "outer"] else None joined_columns = [ - _mask_value(left.get_column(key), left_mask).name(map_left_id(key)) - for key in left.column_names.keys() + _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id(key)) + for key in left.column_ids ] + [ - _mask_value(right.get_column(key), right_mask).name(map_right_id(key)) - for key in right.column_names.keys() + _mask_value(right._get_ibis_column(key), right_mask).name(map_right_id(key)) + for key in right.column_ids ] # If left isn't being masked, can just use left ordering @@ -108,7 +108,7 @@ def join_by_row_identity( joined_expr = core.ArrayValue( left._session, - left.table, + left._table, columns=joined_columns, hidden_ordering_columns=hidden_ordering_columns, ordering=new_ordering, diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/joins/single_column.py index 2d616fc3f0..f194b8f8c4 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/joins/single_column.py @@ -74,14 +74,14 @@ def join_by_column( if ( allow_row_identity_join and how in bigframes.core.joins.row_identity.SUPPORTED_ROW_IDENTITY_HOW - and left.table.equals(right.table) + and left._table.equals(right._table) # Make sure we're joining on exactly the same column(s), at least with # regards to value its possible that they both have the same names but # were modified in different ways. Ignore differences in the names. and all( - left.get_any_column(lcol) + left._get_any_column(lcol) .name("index") - .equals(right.get_any_column(rcol).name("index")) + .equals(right._get_any_column(rcol).name("index")) for lcol, rcol in zip(left_column_ids, right_column_ids) ) ): @@ -90,14 +90,16 @@ def join_by_column( get_column_right, ) = bigframes.core.joins.row_identity.join_by_row_identity(left, right, how=how) left_join_keys = [ - combined_expr.get_column(get_column_left(col)) for col in left_column_ids + combined_expr._get_ibis_column(get_column_left(col)) + for col in left_column_ids ] right_join_keys = [ - combined_expr.get_column(get_column_right(col)) for col in right_column_ids + combined_expr._get_ibis_column(get_column_right(col)) + for col in right_column_ids ] join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how) join_key_ids = [col.get_name() for col in join_key_cols] - combined_expr = combined_expr.projection( + combined_expr = combined_expr._projection( [*join_key_cols, *combined_expr.columns] ) if sort: @@ -119,13 +121,13 @@ def join_by_column( lmapping = { col_id: guid.generate_guid() for col_id in itertools.chain( - left.column_names, left._hidden_ordering_column_names + left.column_ids, left._hidden_ordering_column_names ) } rmapping = { col_id: guid.generate_guid() for col_id in itertools.chain( - right.column_names, right._hidden_ordering_column_names + right.column_ids, right._hidden_ordering_column_names ) } @@ -136,12 +138,12 @@ def get_column_right(col_id): return rmapping[col_id] left_table = left._to_ibis_expr( - ordering_mode="unordered", + "unordered", expose_hidden_cols=True, col_id_overrides=lmapping, ) right_table = right._to_ibis_expr( - ordering_mode="unordered", + "unordered", expose_hidden_cols=True, col_id_overrides=rmapping, ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index eea8beb130..3ec1b4b617 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -254,6 +254,10 @@ def loc(self) -> indexers.LocDataFrameIndexer: def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) + @property + def iat(self) -> indexers.IatDataFrameIndexer: + return indexers.IatDataFrameIndexer(self) + @property def dtypes(self) -> pandas.Series: return pandas.Series(data=self._block.dtypes, index=self._block.column_labels) @@ -554,7 +558,7 @@ def _apply_series_binop( other._block.index, how=how ) - series_column_id = other._value.get_name() + series_column_id = other._value_column series_col = get_column_right(series_column_id) block = joined_index._block for column_id, label in zip( @@ -937,7 +941,7 @@ def drop( columns = labels block = self._block - if index: + if index is not None: level_id = self._resolve_levels(level or 0)[0] if utils.is_list_like(index): @@ -947,6 +951,8 @@ def drop( block, condition_id = block.apply_unary_op( inverse_condition_id, ops.invert_op ) + elif isinstance(index, indexes.Index): + return self._drop_by_index(index) else: block, condition_id = block.apply_unary_op( level_id, ops.partial_right(ops.ne_op, index) @@ -956,10 +962,31 @@ def drop( ) if columns: block = block.drop_columns(self._sql_names(columns)) - if not index and not columns: + if index is None and not columns: raise ValueError("Must specify 'labels' or 'index'/'columns") return DataFrame(block) + def _drop_by_index(self, index: indexes.Index) -> DataFrame: + block = index._data._get_block() + block, ordering_col = block.promote_offsets() + joined_index, (get_column_left, get_column_right) = self._block.index.join( + block.index + ) + + new_ordering_col = get_column_right(ordering_col) + drop_block = joined_index._block + drop_block, drop_col = drop_block.apply_unary_op( + new_ordering_col, + ops.isnull_op, + ) + + drop_block = drop_block.filter(drop_col) + original_columns = [ + get_column_left(column) for column in self._block.value_columns + ] + drop_block = drop_block.select_columns(original_columns) + return DataFrame(drop_block) + def droplevel(self, level: LevelsType, axis: int | str = 0): axis_n = utils.get_axis_number(axis) if axis_n == 0: @@ -1043,6 +1070,12 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) + def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: + # Must be same object type, same column dtypes, and same label values + if not isinstance(other, DataFrame): + return False + return block_ops.equals(self._block, other._block) + def assign(self, **kwargs) -> DataFrame: # TODO(garrettwu) Support list-like values. Requires ordering. # TODO(garrettwu) Support callable values. @@ -1708,24 +1741,49 @@ def pivot( ) return DataFrame(pivot_block) - def stack(self): - # TODO: support 'level' param by simply reordering levels such that selected level is last before passing to Block.stack. - # TODO: match impl to pandas future_stack as described in pandas 2.1 release notes - stack_block = self._block.stack() - result_block = block_ops.dropna( - stack_block, stack_block.value_columns, how="all" - ) + def stack(self, level: LevelsType = -1): if not isinstance(self.columns, pandas.MultiIndex): - return bigframes.series.Series(result_block) - return DataFrame(result_block) + if level not in [0, -1, self.columns.name]: + raise IndexError(f"Invalid level {level} for single-level index") + return self._stack_mono() + return self._stack_multi(level) + + def _stack_mono(self): + result_block = self._block.stack() + return bigframes.series.Series(result_block) + + def _stack_multi(self, level: LevelsType = -1): + n_levels = self.columns.nlevels + if isinstance(level, int) or isinstance(level, str): + level = [level] + level_indices = [] + for level_ref in level: + if isinstance(level_ref, int): + if level_ref < 0: + level_indices.append(n_levels + level_ref) + else: + level_indices.append(level_ref) + else: # str + level_indices.append(self.columns.names.index(level_ref)) + + new_order = [ + *[i for i in range(n_levels) if i not in level_indices], + *level_indices, + ] + + original_columns = typing.cast(pandas.MultiIndex, self.columns) + new_columns = original_columns.reorder_levels(new_order) + + block = self._block.with_column_labels(new_columns) + + block = block.stack(levels=len(level)) + return DataFrame(block) def unstack(self): block = self._block # Special case, unstack with mono-index transpose into a series if self.index.nlevels == 1: - block = block.stack( - how="right", dropna=False, sort=False, levels=self.columns.nlevels - ) + block = block.stack(how="right", levels=self.columns.nlevels) return bigframes.series.Series(block) # Pivot by last level of index @@ -2359,13 +2417,11 @@ def _create_io_query(self, index: bool, ordering_id: Optional[str]) -> str: if ordering_id is not None: return array_value.to_sql( - ordering_mode="offset_col", + offset_column=ordering_id, col_id_overrides=id_overrides, - order_col_name=ordering_id, ) else: return array_value.to_sql( - ordering_mode="unordered", col_id_overrides=id_overrides, ) diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py index f899ac7119..f2478b1ce2 100644 --- a/bigframes/ml/base.py +++ b/bigframes/ml/base.py @@ -104,7 +104,8 @@ def predict(self, X): def register(self: _T, vertex_ai_model_id: Optional[str] = None) -> _T: """Register the model to Vertex AI. - After register, go to https://pantheon.corp.google.com/vertex-ai/models to manage the model registries. + After register, go to Google Cloud Console (https://console.cloud.google.com/vertex-ai/models) + to manage the model registries. Refer to https://cloud.google.com/vertex-ai/docs/model-registry/introduction for more options. Args: diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index c86e5fb3b6..a61dd34e6d 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -38,8 +38,9 @@ class PaLM2TextGenerator(base.Predictor): session (bigframes.Session or None): BQ session to create the model. If None, use the global default session. connection_name (str or None): - connection to connect with remote service. str of the format ... - if None, use default connection in session context. + connection to connect with remote service. str of the format ... + if None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach + permission if the connection isn't fully setup. """ def __init__( @@ -48,7 +49,14 @@ def __init__( connection_name: Optional[str] = None, ): self.session = session or bpd.get_global_session() - self.connection_name = connection_name or self.session._bq_connection + + connection_name = connection_name or self.session._bq_connection + self.connection_name = clients.get_connection_name_full( + connection_name, + default_project=self.session._project, + default_location=self.session._location, + ) + self._bq_connection_manager = clients.BqConnectionManager( self.session.bqconnectionclient, self.session.resourcemanagerclient ) @@ -180,7 +188,14 @@ def __init__( connection_name: Optional[str] = None, ): self.session = session or bpd.get_global_session() - self.connection_name = connection_name or self.session._bq_connection + + connection_name = connection_name or self.session._bq_connection + self.connection_name = clients.get_connection_name_full( + connection_name, + default_project=self.session._project, + default_location=self.session._location, + ) + self._bq_connection_manager = clients.BqConnectionManager( self.session.bqconnectionclient, self.session.resourcemanagerclient ) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index bc08298eb7..f330a703b2 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -705,6 +705,16 @@ def eq_op( return x == y +def eq_nulls_match_op( + x: ibis_types.Value, + y: ibis_types.Value, +): + """Variant of eq_op where nulls match each other. Only use where dtypes are known to be same.""" + left = x.cast(ibis_dtypes.str).fillna(ibis_types.literal("$NULL_SENTINEL$")) + right = y.cast(ibis_dtypes.str).fillna(ibis_types.literal("$NULL_SENTINEL$")) + return left == right + + def ne_op( x: ibis_types.Value, y: ibis_types.Value, diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 51eaad18b9..fc76d07edb 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -16,7 +16,6 @@ import typing -import ibis.expr.types as ibis_types import pandas as pd import bigframes.constants as constants @@ -106,11 +105,6 @@ def __init__( if pd_series.name is None: self._block = self._block.with_column_labels([None]) - @property - def _value(self) -> ibis_types.Value: - """Private property to get Ibis expression for the value column.""" - return self._block.expr.get_column(self._value_column) - @property def _value_column(self) -> str: return self._block.value_columns[0] diff --git a/bigframes/remote_function.py b/bigframes/remote_function.py index 6fc2f8e59f..37c7a2fc64 100644 --- a/bigframes/remote_function.py +++ b/bigframes/remote_function.py @@ -695,9 +695,12 @@ def remote_function( persistent name. """ + import bigframes.pandas as bpd + + session = session or bpd.get_global_session() # A BigQuery client is required to perform BQ operations - if not bigquery_client and session: + if not bigquery_client: bigquery_client = session.bqclient if not bigquery_client: raise ValueError( @@ -706,7 +709,7 @@ def remote_function( ) # A BigQuery connection client is required to perform BQ connection operations - if not bigquery_connection_client and session: + if not bigquery_connection_client: bigquery_connection_client = session.bqconnectionclient if not bigquery_connection_client: raise ValueError( @@ -716,8 +719,7 @@ def remote_function( # A cloud functions client is required to perform cloud functions operations if not cloud_functions_client: - if session: - cloud_functions_client = session.cloudfunctionsclient + cloud_functions_client = session.cloudfunctionsclient if not cloud_functions_client: raise ValueError( "A cloud functions client must be provided, either directly or via session. " @@ -726,8 +728,7 @@ def remote_function( # A resource manager client is required to get/set IAM operations if not resource_manager_client: - if session: - resource_manager_client = session.resourcemanagerclient + resource_manager_client = session.resourcemanagerclient if not resource_manager_client: raise ValueError( "A resource manager client must be provided, either directly or via session. " @@ -740,15 +741,10 @@ def remote_function( dataset_ref = bigquery.DatasetReference.from_string( dataset, default_project=bigquery_client.project ) - elif session: + else: dataset_ref = bigquery.DatasetReference.from_string( session._session_dataset_id, default_project=bigquery_client.project ) - else: - raise ValueError( - "Project and dataset must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) bq_location, cloud_function_region = get_remote_function_locations( bigquery_client.location @@ -756,40 +752,30 @@ def remote_function( # A connection is required for BQ remote function # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function - if not bigquery_connection and session: - bigquery_connection = session._bq_connection # type: ignore if not bigquery_connection: + bigquery_connection = session._bq_connection # type: ignore + + bigquery_connection = clients.get_connection_name_full( + bigquery_connection, + default_project=dataset_ref.project, + default_location=bq_location, + ) + # Guaranteed to be the form of .. + ( + gcp_project_id, + bq_connection_location, + bq_connection_id, + ) = bigquery_connection.split(".") + if gcp_project_id.casefold() != dataset_ref.project.casefold(): raise ValueError( - "BigQuery connection must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" + "The project_id does not match BigQuery connection gcp_project_id: " + f"{dataset_ref.project}." + ) + if bq_connection_location.casefold() != bq_location.casefold(): + raise ValueError( + "The location does not match BigQuery connection location: " + f"{bq_location}." ) - - # Check connection_id with `LOCATION.CONNECTION_ID` or `PROJECT_ID.LOCATION.CONNECTION_ID` format. - if bigquery_connection.count(".") == 1: - bq_connection_location, bq_connection_id = bigquery_connection.split(".") - if bq_connection_location.casefold() != bq_location.casefold(): - raise ValueError( - "The location does not match BigQuery connection location: " - f"{bq_location}." - ) - bigquery_connection = bq_connection_id - elif bigquery_connection.count(".") == 2: - ( - gcp_project_id, - bq_connection_location, - bq_connection_id, - ) = bigquery_connection.split(".") - if gcp_project_id.casefold() != dataset_ref.project.casefold(): - raise ValueError( - "The project_id does not match BigQuery connection gcp_project_id: " - f"{dataset_ref.project}." - ) - if bq_connection_location.casefold() != bq_location.casefold(): - raise ValueError( - "The location does not match BigQuery connection location: " - f"{bq_location}." - ) - bigquery_connection = bq_connection_id def wrapper(f): if not callable(f): @@ -808,7 +794,7 @@ def wrapper(f): dataset_ref.dataset_id, bigquery_client, bigquery_connection_client, - bigquery_connection, + bq_connection_id, resource_manager_client, ) diff --git a/bigframes/series.py b/bigframes/series.py index 8815a6abde..56e1b43a03 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -87,6 +87,10 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) + @property + def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: + return bigframes.core.indexers.IatSeriesIndexer(self) + @property def name(self) -> blocks.Label: return self._name @@ -99,6 +103,10 @@ def shape(self) -> typing.Tuple[int]: def size(self) -> int: return self.shape[0] + @property + def ndim(self) -> int: + return 1 + @property def empty(self) -> bool: return self.shape[0] == 0 @@ -123,6 +131,13 @@ def query_job(self) -> Optional[bigquery.QueryJob]: def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) + @property + def T(self) -> Series: + return self.transpose() + + def transpose(self) -> Series: + return self + def _set_internal_query_job(self, query_job: bigquery.QueryJob): self._query_job = query_job @@ -198,6 +213,14 @@ def rename_axis( labels = [mapper] return Series(self._block.with_index_labels(labels)) + def equals( + self, other: typing.Union[Series, bigframes.dataframe.DataFrame] + ) -> bool: + # Must be same object type, same column dtypes, and same label values + if not isinstance(other, Series): + return False + return block_ops.equals(self._block, other._block) + def reset_index( self, *, @@ -362,6 +385,8 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.WindowSpec(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) + pad = ffill + def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.WindowSpec(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -743,6 +768,8 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: agg_ops.lookup_agg_func(typing.cast(str, func)) ) + aggregate = agg + def skew(self): count = self.count() if count < 3: @@ -1135,7 +1162,11 @@ def _groupby_values( def apply(self, func) -> Series: # TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs # is actually a ternary op - return self._apply_unary_op(ops.RemoteFunctionOp(func)) + # Reproject as workaround to applying filter too late. This forces the filter + # to be applied before passing data to remote function, protecting from bad + # inputs causing errors. + reprojected_series = Series(self._block._force_reproject()) + return reprojected_series._apply_unary_op(ops.RemoteFunctionOp(func)) def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_prefix(prefix)) diff --git a/bigframes/session.py b/bigframes/session.py index ac48c977cb..a7cb78e3ff 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -350,10 +350,14 @@ def resourcemanagerclient(self): @property def _session_dataset_id(self): """A dataset for storing temporary objects local to the session - This is a workaround for BQML models and remote functions that do not + This is a workaround for remote functions that do not yet support session-temporary instances.""" return self._session_dataset.dataset_id + @property + def _project(self): + return self.bqclient.project + def _create_and_bind_bq_session(self): """Create a BQ session and bind the session id with clients to capture BQ activities: go/bigframes-transient-data""" diff --git a/bigframes/version.py b/bigframes/version.py index 238b64473a..e91e27ff76 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.6.0" +__version__ = "0.7.0" diff --git a/docs/templates/toc.yml b/docs/templates/toc.yml index 891f15a51b..0758bb41d8 100644 --- a/docs/templates/toc.yml +++ b/docs/templates/toc.yml @@ -121,10 +121,18 @@ - items: - name: Overview uid: bigframes.ml.preprocessing - - name: OneHotEncoder - uid: bigframes.ml.preprocessing.OneHotEncoder + - name: KBinsDiscretizer + uid: bigframes.ml.preprocessing.KBinsDiscretizer + - name: LabelEncoder + uid: bigframes.ml.preprocessing.LabelEncoder + - name: MaxAbsScaler + uid: bigframes.ml.preprocessing.MaxAbsScaler + - name: MinMaxScaler + uid: bigframes.ml.preprocessing.MinMaxScaler - name: StandardScaler uid: bigframes.ml.preprocessing.StandardScaler + - name: OneHotEncoder + uid: bigframes.ml.preprocessing.OneHotEncoder name: preprocessing name: bigframes.ml name: BigQuery DataFrames diff --git a/notebooks/getting_started/bq_dataframes_llm_code_generation.ipynb b/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb similarity index 95% rename from notebooks/getting_started/bq_dataframes_llm_code_generation.ipynb rename to notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb index 39e2ef535c..2e4ce3e510 100644 --- a/notebooks/getting_started/bq_dataframes_llm_code_generation.ipynb +++ b/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb @@ -34,18 +34,18 @@ "\n", "\n", " \n", " \n", "
\n", - " \n", + " \n", " \"Colab Run in Colab\n", " \n", " \n", - " \n", + " \n", " \"GitHub\n", " View on GitHub\n", " \n", " \n", - " \n", + " \n", " \"Vertex\n", " Open in Vertex AI Workbench\n", " \n", @@ -162,6 +162,9 @@ }, { "cell_type": "markdown", + "metadata": { + "id": "Wbr2aVtFQBcg" + }, "source": [ "### Set up your Google Cloud project\n", "\n", @@ -183,10 +186,7 @@ " * Vertex AI API\n", "\n", "4. If you are running this notebook locally, install the [Cloud SDK](https://cloud.google.com/sdk)." - ], - "metadata": { - "id": "Wbr2aVtFQBcg" - } + ] }, { "cell_type": "markdown", @@ -350,39 +350,44 @@ }, { "cell_type": "markdown", - "source": [ - "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.reset_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." - ], "metadata": { "id": "DTVtFlqeFbrU" - } + }, + "source": [ + "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.reset_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." + ] }, { "cell_type": "markdown", + "metadata": { + "id": "6eytf4xQHzcF" + }, "source": [ "# Define the LLM model\n", "\n", "BigQuery DataFrames provides integration with [`text-bison` model of the PaLM API](https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/text) via Vertex AI.\n", "\n", "This section walks through a few steps required in order to use the model in your notebook." - ], - "metadata": { - "id": "6eytf4xQHzcF" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "rS4VO1TGiO4G" + }, "source": [ "## Create a BigQuery Cloud resource connection\n", "\n", "You need to create a [Cloud resource connection](https://cloud.google.com/bigquery/docs/create-cloud-resource-connection) to enable BigQuery DataFrames to interact with Vertex AI services." - ], - "metadata": { - "id": "rS4VO1TGiO4G" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "KFPjDM4LVh96" + }, + "outputs": [], "source": [ "CONN_NAME = \"bqdf-llm\"\n", "\n", @@ -412,15 +417,13 @@ " f\"serviceAccount:{response.cloud_resource.service_account_id}\"\n", " )\n", "print(CONN_SERVICE_ACCOUNT)" - ], - "metadata": { - "id": "KFPjDM4LVh96" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "W6l6Ol2biU9h" + }, "source": [ "## Set permissions for the service account\n", "\n", @@ -429,52 +432,52 @@ " - `roles/run.invoker`: This role is required for the connection to have read-only access to Cloud Run services that back custom/remote functions ([documentation](https://cloud.google.com/bigquery/docs/remote-functions#grant_permission_on_function)).\n", "\n", "Set these permissions by running the following `gcloud` commands:" - ], - "metadata": { - "id": "W6l6Ol2biU9h" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "d8wja24SVq6s" + }, + "outputs": [], "source": [ "!gcloud projects add-iam-policy-binding {PROJECT_ID} --condition=None --no-user-output-enabled --member={CONN_SERVICE_ACCOUNT} --role='roles/bigquery.connectionUser'\n", "!gcloud projects add-iam-policy-binding {PROJECT_ID} --condition=None --no-user-output-enabled --member={CONN_SERVICE_ACCOUNT} --role='roles/aiplatform.user'\n", "!gcloud projects add-iam-policy-binding {PROJECT_ID} --condition=None --no-user-output-enabled --member={CONN_SERVICE_ACCOUNT} --role='roles/run.invoker'" - ], - "metadata": { - "id": "d8wja24SVq6s" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "qUjT8nw-jIXp" + }, "source": [ "## Define the model\n", "\n", "Use `bigframes.ml.llm` to define the model:" - ], - "metadata": { - "id": "qUjT8nw-jIXp" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "sdjeXFwcHfl7" + }, + "outputs": [], "source": [ "from bigframes.ml.llm import PaLM2TextGenerator\n", "\n", "session = bf.get_global_session()\n", "connection = f\"{PROJECT_ID}.{REGION}.{CONN_NAME}\"\n", "model = PaLM2TextGenerator(session=session, connection_name=connection)" - ], - "metadata": { - "id": "sdjeXFwcHfl7" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "GbW0oCnU1s1N" + }, "source": [ "# Read data from Cloud Storage into BigQuery DataFrames\n", "\n", @@ -486,80 +489,82 @@ "* An in-memory pandas DataFrame\n", "\n", "In this tutorial, you create BigQuery DataFrames DataFrames by reading two CSV files stored in Cloud Storage, one containing a list of DataFrame API names and one containing a list of Series API names." - ], - "metadata": { - "id": "GbW0oCnU1s1N" - } + ] }, { "cell_type": "code", - "source": [ - "df_api = bf.read_csv(\"gs://cloud-samples-data/vertex-ai/bigframe/df.csv\")\n", - "series_api = bf.read_csv(\"gs://cloud-samples-data/vertex-ai/bigframe/series.csv\")" - ], + "execution_count": null, "metadata": { "id": "SchiTkQGIJog" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "df_api = bf.read_csv(\"gs://cloud-samples-data/vertex-ai/bigframe/df.csv\")\n", + "series_api = bf.read_csv(\"gs://cloud-samples-data/vertex-ai/bigframe/series.csv\")" + ] }, { "cell_type": "markdown", - "source": [ - "Take a peek at a few rows of data for each file:" - ], "metadata": { "id": "7OBjw2nmQY3-" - } + }, + "source": [ + "Take a peek at a few rows of data for each file:" + ] }, { "cell_type": "code", - "source": [ - "df_api.head(2)" - ], + "execution_count": null, "metadata": { "id": "QCqgVCIsGGuv" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "df_api.head(2)" + ] }, { "cell_type": "code", - "source": [ - "series_api.head(2)" - ], + "execution_count": null, "metadata": { "id": "BGJnZbgEGS5-" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "series_api.head(2)" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "m3ZJEsi7SUKV" + }, "source": [ "# Generate code using the LLM model\n", "\n", "Prepare the prompts and send them to the LLM model for prediction." - ], - "metadata": { - "id": "m3ZJEsi7SUKV" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "9EMAqR37AfLS" + }, "source": [ "## Prompt design in BigQuery DataFrames\n", "\n", "Designing prompts for LLMs is a fast growing area and you can read more in [this documentation](https://cloud.google.com/vertex-ai/docs/generative-ai/learn/introduction-prompt-design).\n", "\n", "For this tutorial, you use a simple prompt to ask the LLM model for sample code for each of the API methods (or rows) from the last step's DataFrames. The output is the new DataFrames `df_prompt` and `series_prompt`, which contain the full prompt text." - ], - "metadata": { - "id": "9EMAqR37AfLS" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "EDAaIwHpQCDZ" + }, + "outputs": [], "source": [ "df_prompt_prefix = \"Generate Pandas sample code for DataFrame.\"\n", "series_prompt_prefix = \"Generate Pandas sample code for Series.\"\n", @@ -568,83 +573,83 @@ "series_prompt = (series_prompt_prefix + series_api['API'])\n", "\n", "df_prompt.head(2)" - ], - "metadata": { - "id": "EDAaIwHpQCDZ" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "rwPLjqW2Ajzh" + }, "source": [ "## Make predictions using the LLM model\n", "\n", "Use the BigQuery DataFrames DataFrame containing the full prompt text as the input to the `predict` method. The `predict` method calls the LLM model and returns its generated text output back to two new BigQuery DataFrames DataFrames, `df_pred` and `series_pred`.\n", "\n", "Note: The predictions might take a few minutes to run." - ], - "metadata": { - "id": "rwPLjqW2Ajzh" - } + ] }, { "cell_type": "code", - "source": [ - "df_pred = model.predict(df_prompt.to_frame(), max_output_tokens=1024)\n", - "series_pred = model.predict(series_prompt.to_frame(), max_output_tokens=1024)" - ], + "execution_count": null, "metadata": { "id": "6i6HkFJZa8na" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "df_pred = model.predict(df_prompt.to_frame(), max_output_tokens=1024)\n", + "series_pred = model.predict(series_prompt.to_frame(), max_output_tokens=1024)" + ] }, { "cell_type": "markdown", - "source": [ - "Once the predictions are processed, take a look at the sample output from the LLM, which provides code samples for the API names listed in the DataFrames dataset." - ], "metadata": { "id": "89cB8MW4UIdV" - } + }, + "source": [ + "Once the predictions are processed, take a look at the sample output from the LLM, which provides code samples for the API names listed in the DataFrames dataset." + ] }, { "cell_type": "code", - "source": [ - "print(df_pred['ml_generate_text_llm_result'].iloc[0])" - ], + "execution_count": null, "metadata": { "id": "9A2gw6hP_2nX" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "print(df_pred['ml_generate_text_llm_result'].iloc[0])" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "Fx4lsNqMorJ-" + }, "source": [ "# Manipulate LLM output using a remote function\n", "\n", "The output that the LLM provides often contains additional text beyond the code sample itself. Using BigQuery DataFrames, you can deploy custom Python functions that process and transform this output.\n", "\n" - ], - "metadata": { - "id": "Fx4lsNqMorJ-" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "d8L7SN03VByG" + }, "source": [ "Running the cell below creates a custom function that you can use to process the LLM output data in two ways:\n", "1. Strip the LLM text output to include only the code block.\n", "2. Substitute `import pandas as pd` with `import bigframes.pandas as bf` so that the resulting code block works with BigQuery DataFrames." - ], - "metadata": { - "id": "d8L7SN03VByG" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "GskyyUQPowBT" + }, + "outputs": [], "source": [ "@bf.remote_function([str], str, bigquery_connection=CONN_NAME)\n", "def extract_code(text: str):\n", @@ -656,166 +661,161 @@ " return res\n", " except:\n", " return \"\"" - ], - "metadata": { - "id": "GskyyUQPowBT" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "The custom function is deployed as a Cloud Function, and then integrated with BigQuery as a [remote function](https://cloud.google.com/bigquery/docs/remote-functions). Save both of the function names so that you can clean them up at the end of this notebook." - ], "metadata": { "id": "hVQAoqBUOJQf" - } + }, + "source": [ + "The custom function is deployed as a Cloud Function, and then integrated with BigQuery as a [remote function](https://cloud.google.com/bigquery/docs/remote-functions). Save both of the function names so that you can clean them up at the end of this notebook." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "PBlp-C-DOHRO" + }, + "outputs": [], "source": [ "CLOUD_FUNCTION_NAME = format(extract_code.bigframes_cloud_function)\n", "print(\"Cloud Function Name \" + CLOUD_FUNCTION_NAME)\n", "REMOTE_FUNCTION_NAME = format(extract_code.bigframes_remote_function)\n", "print(\"Remote Function Name \" + REMOTE_FUNCTION_NAME)" - ], - "metadata": { - "id": "PBlp-C-DOHRO" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Apply the custom function to each LLM output DataFrame to get the processed results:" - ], "metadata": { "id": "4FEucaiqVs3H" - } + }, + "source": [ + "Apply the custom function to each LLM output DataFrame to get the processed results:" + ] }, { "cell_type": "code", - "source": [ - "df_code = df_pred.assign(code=df_pred['ml_generate_text_llm_result'].apply(extract_code))\n", - "series_code = series_pred.assign(code=series_pred['ml_generate_text_llm_result'].apply(extract_code))" - ], + "execution_count": null, "metadata": { "id": "bsQ9cmoWo0Ps" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "df_code = df_pred.assign(code=df_pred['ml_generate_text_llm_result'].apply(extract_code))\n", + "series_code = series_pred.assign(code=series_pred['ml_generate_text_llm_result'].apply(extract_code))" + ] }, { "cell_type": "markdown", - "source": [ - "You can see the differences by inspecting the first row of data:" - ], "metadata": { "id": "ujQVVuhfWA3y" - } + }, + "source": [ + "You can see the differences by inspecting the first row of data:" + ] }, { "cell_type": "code", - "source": [ - "print(df_code['code'].iloc[0])" - ], + "execution_count": null, "metadata": { "id": "7yWzjhGy_zcy" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "print(df_code['code'].iloc[0])" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "GTRdUw-Ro5R1" + }, "source": [ "# Save the results to Cloud Storage\n", "\n", "BigQuery DataFrames lets you save a BigQuery DataFrames DataFrame as a CSV file in Cloud Storage for further use. Try that now with your processed LLM output data." - ], - "metadata": { - "id": "GTRdUw-Ro5R1" - } + ] }, { "cell_type": "markdown", - "source": [ - "Create a new Cloud Storage bucket with a unique name:" - ], "metadata": { "id": "9DQ7eiQxPTi3" - } + }, + "source": [ + "Create a new Cloud Storage bucket with a unique name:" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "-J5LHgS6LLZ0" + }, + "outputs": [], "source": [ "import uuid\n", "BUCKET_ID = \"code-samples-\" + str(uuid.uuid1())\n", "\n", "!gsutil mb gs://{BUCKET_ID}" - ], - "metadata": { - "id": "-J5LHgS6LLZ0" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Use `to_csv` to write each BigQuery DataFrames DataFrame as a CSV file in the Cloud Storage bucket:" - ], "metadata": { "id": "tyxZXj0UPYUv" - } + }, + "source": [ + "Use `to_csv` to write each BigQuery DataFrames DataFrame as a CSV file in the Cloud Storage bucket:" + ] }, { "cell_type": "code", - "source": [ - "df_code[[\"code\"]].to_csv(f\"gs://{BUCKET_ID}/df_code*.csv\")\n", - "series_code[[\"code\"]].to_csv(f\"gs://{BUCKET_ID}/series_code*.csv\")" - ], + "execution_count": null, "metadata": { "id": "Zs_b5L-4IvER" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "df_code[[\"code\"]].to_csv(f\"gs://{BUCKET_ID}/df_code*.csv\")\n", + "series_code[[\"code\"]].to_csv(f\"gs://{BUCKET_ID}/series_code*.csv\")" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "UDBtDlrTuuh8" + }, "source": [ "You can navigate to the Cloud Storage bucket browser to download the two files and view them.\n", "\n", "Run the following cell, and then follow the link to your Cloud Storage bucket browser:" - ], - "metadata": { - "id": "UDBtDlrTuuh8" - } + ] }, { "cell_type": "code", - "source": [ - "print(f'https://console.developers.google.com/storage/browser/{BUCKET_ID}/')" - ], + "execution_count": null, "metadata": { "id": "PspCXu-qu_ND" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "print(f'https://console.developers.google.com/storage/browser/{BUCKET_ID}/')" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "RGSvUk48RK20" + }, "source": [ "# Summary and next steps\n", "\n", "You've used BigQuery DataFrames' integration with LLM models (`bigframes.ml.llm`) to generate code samples, and have tranformed LLM output by creating and using a custom function in BigQuery DataFrames.\n", "\n", "Learn more about BigQuery DataFrames in the [documentation](https://cloud.google.com/python/docs/reference/bigframes/latest) and find more sample notebooks in the [GitHub repo](https://github.com/googleapis/python-bigquery-dataframes/tree/main/notebooks)." - ], - "metadata": { - "id": "RGSvUk48RK20" - } + ] }, { "cell_type": "markdown", @@ -833,6 +833,11 @@ }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "yw7A461XLjvW" + }, + "outputs": [], "source": [ "# # Delete the BigQuery Connection\n", "# from google.cloud import bigquery_connection_v1 as bq_connection\n", @@ -840,12 +845,7 @@ "# CONNECTION_ID = f\"projects/{PROJECT_ID}/locations/{REGION}/connections/{CONN_NAME}\"\n", "# client.delete_connection(name=CONNECTION_ID)\n", "# print(f\"Deleted connection '{CONNECTION_ID}'.\")" - ], - "metadata": { - "id": "yw7A461XLjvW" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "code", @@ -864,22 +864,22 @@ }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "iQFo6OUBLmi3" + }, + "outputs": [], "source": [ "# # Delete the Google Cloud Storage bucket and files\n", "# ! gsutil rm -r gs://{BUCKET_ID}\n", "# print(f\"Deleted bucket '{BUCKET_ID}'.\")" - ], - "metadata": { - "id": "iQFo6OUBLmi3" - }, - "execution_count": null, - "outputs": [] + ] } ], "metadata": { "colab": { - "toc_visible": true, - "provenance": [] + "provenance": [], + "toc_visible": true }, "kernelspec": { "display_name": "Python 3", diff --git a/notebooks/getting_started/bq_dataframes_ml_linear_regression.ipynb b/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb similarity index 98% rename from notebooks/getting_started/bq_dataframes_ml_linear_regression.ipynb rename to notebooks/regression/bq_dataframes_ml_linear_regression.ipynb index d317217810..338d6edf4f 100644 --- a/notebooks/getting_started/bq_dataframes_ml_linear_regression.ipynb +++ b/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb @@ -35,18 +35,18 @@ "\n", "\n", " \n", " \n", "
\n", - " \n", + " \n", " \"Colab Run in Colab\n", " \n", " \n", - " \n", + " \n", " \"GitHub\n", " View on GitHub\n", " \n", " \n", - " \n", + " \n", " \"Vertex\n", " Open in Vertex AI Workbench\n", " \n", diff --git a/noxfile.py b/noxfile.py index a113e1fcde..84e5ab11bb 100644 --- a/noxfile.py +++ b/noxfile.py @@ -607,8 +607,8 @@ def notebook(session): # appropriate values and omitting cleanup logic that may break # our test infrastructure. "notebooks/getting_started/getting_started_bq_dataframes.ipynb", - "notebooks/getting_started/bq_dataframes_llm_code_generation.ipynb", - "notebooks/getting_started/bq_dataframes_ml_linear_regression.ipynb", + "notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb", + "notebooks/regression/bq_dataframes_ml_linear_regression.ipynb", "notebooks/generative_ai/bq_dataframes_ml_drug_name_generation.ipynb", "notebooks/vertex_sdk/sdk2_bigframes_pytorch.ipynb", "notebooks/vertex_sdk/sdk2_bigframes_sklearn.ipynb", diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index 7486277487..e546c09f97 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -26,7 +26,8 @@ def test_create_text_generator_model(palm2_text_generator_model): assert palm2_text_generator_model._bqml_model is not None -def test_create_text_generator_model_defaults(bq_connection): +@pytest.mark.flaky(retries=2, delay=120) +def test_create_text_generator_model_default_session(bq_connection, llm_text_pandas_df): import bigframes.pandas as bpd bpd.reset_session() @@ -36,6 +37,40 @@ def test_create_text_generator_model_defaults(bq_connection): model = llm.PaLM2TextGenerator() assert model is not None assert model._bqml_model is not None + assert model.connection_name.casefold() == "bigframes-dev.us.bigframes-rf-conn" + + llm_text_df = bpd.read_pandas(llm_text_pandas_df) + + df = model.predict(llm_text_df).to_pandas() + TestCase().assertSequenceEqual(df.shape, (3, 1)) + assert "ml_generate_text_llm_result" in df.columns + series = df["ml_generate_text_llm_result"] + assert all(series.str.len() > 20) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_create_text_generator_model_default_connection(llm_text_pandas_df): + from bigframes import _config + import bigframes.pandas as bpd + + bpd.reset_session() + _config.options = _config.Options() # reset configs + + llm_text_df = bpd.read_pandas(llm_text_pandas_df) + + model = llm.PaLM2TextGenerator() + assert model is not None + assert model._bqml_model is not None + assert ( + model.connection_name.casefold() + == "bigframes-dev.us.bigframes-default-connection" + ) + + df = model.predict(llm_text_df).to_pandas() + TestCase().assertSequenceEqual(df.shape, (3, 1)) + assert "ml_generate_text_llm_result" in df.columns + series = df["ml_generate_text_llm_result"] + assert all(series.str.len() > 20) # Marked as flaky only because BQML LLM is in preview, the service only has limited capacity, not stable enough. diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b8616a54d6..9f1092d09d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -258,6 +258,61 @@ def test_drop_index(scalars_dfs): pd.testing.assert_frame_equal(pd_result, bf_result) +def test_drop_pandas_index(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + drop_index = scalars_pandas_df.iloc[[4, 1, 2]].index + + pd_result = scalars_pandas_df.drop(index=drop_index) + bf_result = scalars_df.drop(index=drop_index).to_pandas() + + pd.testing.assert_frame_equal(pd_result, bf_result) + + +def test_drop_bigframes_index(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + drop_index = scalars_df.loc[[4, 1, 2]].index + drop_pandas_index = scalars_pandas_df.loc[[4, 1, 2]].index + + pd_result = scalars_pandas_df.drop(index=drop_pandas_index) + bf_result = scalars_df.drop(index=drop_index).to_pandas() + + pd.testing.assert_frame_equal(pd_result, bf_result) + + +def test_drop_bigframes_index_with_na(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + scalars_df = scalars_df.copy() + scalars_pandas_df = scalars_pandas_df.copy() + scalars_df = scalars_df.set_index("bytes_col") + scalars_pandas_df = scalars_pandas_df.set_index("bytes_col") + drop_index = scalars_df.iloc[[3, 5]].index + drop_pandas_index = scalars_pandas_df.iloc[[3, 5]].index + + pd_result = scalars_pandas_df.drop(index=drop_pandas_index) # drop_pandas_index) + bf_result = scalars_df.drop(index=drop_index).to_pandas() + + pd.testing.assert_frame_equal(pd_result, bf_result) + + +def test_drop_bigframes_multiindex(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + scalars_df = scalars_df.copy() + scalars_pandas_df = scalars_pandas_df.copy() + sub_df = scalars_df.iloc[[4, 1, 2]] + sub_pandas_df = scalars_pandas_df.iloc[[4, 1, 2]] + sub_df = sub_df.set_index(["bytes_col", "numeric_col"]) + sub_pandas_df = sub_pandas_df.set_index(["bytes_col", "numeric_col"]) + drop_index = sub_df.index + drop_pandas_index = sub_pandas_df.index + + scalars_df = scalars_df.set_index(["bytes_col", "numeric_col"]) + scalars_pandas_df = scalars_pandas_df.set_index(["bytes_col", "numeric_col"]) + bf_result = scalars_df.drop(index=drop_index).to_pandas() + pd_result = scalars_pandas_df.drop(index=drop_pandas_index) + + pd.testing.assert_frame_equal(pd_result, bf_result) + + def test_drop_labels_axis_0(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs @@ -1830,6 +1885,8 @@ def test_df_describe(scalars_dfs): def test_df_stack(scalars_dfs): + if pandas.__version__.startswith("1.") or pandas.__version__.startswith("2.0"): + pytest.skip("pandas <2.1 uses different stack implementation") scalars_df, scalars_pandas_df = scalars_dfs # To match bigquery dataframes scalars_pandas_df = scalars_pandas_df.copy() @@ -1838,7 +1895,7 @@ def test_df_stack(scalars_dfs): columns = ["int64_col", "int64_too", "rowindex_2"] bf_result = scalars_df[columns].stack().to_pandas() - pd_result = scalars_pandas_df[columns].stack() + pd_result = scalars_pandas_df[columns].stack(future_stack=True) # Pandas produces NaN, where bq dataframes produces pd.NA pd.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @@ -2022,7 +2079,7 @@ def test_iloc_slice_nested(scalars_df_index, scalars_pandas_df_index): @pytest.mark.parametrize( "index", - [0, 5, -2], + [0, 5, -2, (2,)], ) def test_iloc_single_integer(scalars_df_index, scalars_pandas_df_index, index): bf_result = scalars_df_index.iloc[index] @@ -2034,6 +2091,59 @@ def test_iloc_single_integer(scalars_df_index, scalars_pandas_df_index, index): ) +@pytest.mark.parametrize( + "index", + [(2, 5), (5, 0), (0, 0)], +) +def test_iloc_tuple(scalars_df_index, scalars_pandas_df_index, index): + bf_result = scalars_df_index.iloc[index] + pd_result = scalars_pandas_df_index.iloc[index] + + assert bf_result == pd_result + + +@pytest.mark.parametrize( + ("index", "error"), + [ + ((1, 1, 1), pd.errors.IndexingError), + (("asd", "asd", "asd"), pd.errors.IndexingError), + (("asd"), TypeError), + ], +) +def test_iloc_tuple_errors(scalars_df_index, scalars_pandas_df_index, index, error): + with pytest.raises(error): + scalars_df_index.iloc[index] + with pytest.raises(error): + scalars_pandas_df_index.iloc[index] + + +@pytest.mark.parametrize( + "index", + [(2, 5), (5, 0), (0, 0)], +) +def test_iat(scalars_df_index, scalars_pandas_df_index, index): + bf_result = scalars_df_index.iat[index] + pd_result = scalars_pandas_df_index.iat[index] + + assert bf_result == pd_result + + +@pytest.mark.parametrize( + ("index", "error"), + [ + (0, TypeError), + ("asd", ValueError), + ((1, 2, 3), TypeError), + (("asd", "asd"), ValueError), + ], +) +def test_iat_errors(scalars_df_index, scalars_pandas_df_index, index, error): + with pytest.raises(error): + scalars_pandas_df_index.iat[index] + with pytest.raises(error): + scalars_df_index.iat[index] + + def test_iloc_single_integer_out_of_bound_error( scalars_df_index, scalars_pandas_df_index ): @@ -2496,6 +2606,74 @@ def test_df_reindex_columns(scalars_df_index, scalars_pandas_df_index): ) +def test_df_equals_identical(scalars_df_index, scalars_pandas_df_index): + unsupported = [ + "geography_col", + ] + scalars_df_index = scalars_df_index.drop(columns=unsupported) + scalars_pandas_df_index = scalars_pandas_df_index.drop(columns=unsupported) + + bf_result = scalars_df_index.equals(scalars_df_index) + pd_result = scalars_pandas_df_index.equals(scalars_pandas_df_index) + + assert pd_result == bf_result + + +def test_df_equals_series(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index[["int64_col"]].equals(scalars_df_index["int64_col"]) + pd_result = scalars_pandas_df_index[["int64_col"]].equals( + scalars_pandas_df_index["int64_col"] + ) + + assert pd_result == bf_result + + +def test_df_equals_different_dtype(scalars_df_index, scalars_pandas_df_index): + columns = ["int64_col", "int64_too"] + scalars_df_index = scalars_df_index[columns] + scalars_pandas_df_index = scalars_pandas_df_index[columns] + + bf_modified = scalars_df_index.copy() + bf_modified = bf_modified.astype("Float64") + + pd_modified = scalars_pandas_df_index.copy() + pd_modified = pd_modified.astype("Float64") + + bf_result = scalars_df_index.equals(bf_modified) + pd_result = scalars_pandas_df_index.equals(pd_modified) + + assert pd_result == bf_result + + +def test_df_equals_different_values(scalars_df_index, scalars_pandas_df_index): + columns = ["int64_col", "int64_too"] + scalars_df_index = scalars_df_index[columns] + scalars_pandas_df_index = scalars_pandas_df_index[columns] + + bf_modified = scalars_df_index.copy() + bf_modified["int64_col"] = bf_modified.int64_col + 1 + + pd_modified = scalars_pandas_df_index.copy() + pd_modified["int64_col"] = pd_modified.int64_col + 1 + + bf_result = scalars_df_index.equals(bf_modified) + pd_result = scalars_pandas_df_index.equals(pd_modified) + + assert pd_result == bf_result + + +def test_df_equals_extra_column(scalars_df_index, scalars_pandas_df_index): + columns = ["int64_col", "int64_too"] + more_columns = ["int64_col", "int64_too", "float64_col"] + + bf_result = scalars_df_index[columns].equals(scalars_df_index[more_columns]) + pd_result = scalars_pandas_df_index[columns].equals( + scalars_pandas_df_index[more_columns] + ) + + assert pd_result == bf_result + + def test_df_reindex_like(scalars_df_index, scalars_pandas_df_index): reindex_target_bf = scalars_df_index.reindex( columns=["not_a_col", "int64_col", "int64_too"], index=[5, 1, 3, 99, 1] diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index 19f1c557ef..a132676770 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -718,25 +718,37 @@ def test_column_multi_index_cumsum(scalars_df_index, scalars_pandas_df_index): pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) -def test_column_multi_index_stack(scalars_df_index, scalars_pandas_df_index): - columns = ["int64_too", "int64_col", "rowindex_2"] +@pytest.mark.parametrize( + ("level",), + [(["l3", "l1"],), ([-2, -1],), (["l3"],), ("l2",), (-3,)], +) +def test_column_multi_index_stack(level): + if pandas.__version__.startswith("1.") or pandas.__version__.startswith("2.0"): + pytest.skip("pandas <2.1 uses different stack implementation") + level1 = pandas.Index(["b", "a", "b"]) - # Need resulting column to be pyarrow string rather than object dtype - level2 = pandas.Index(["a", "b", "b"], dtype="string[pyarrow]") - multi_columns = pandas.MultiIndex.from_arrays([level1, level2]) - bf_df = scalars_df_index[columns].copy() - bf_df.columns = multi_columns - pd_df = scalars_pandas_df_index[columns].copy() - pd_df.columns = multi_columns + level2 = pandas.Index(["a", "b", "b"]) + level3 = pandas.Index(["b", "b", "a"]) - bf_result = bf_df.stack().to_pandas() - # Shifting sort behavior in stack - pd_result = pd_df.stack() + multi_columns = pandas.MultiIndex.from_arrays( + [level1, level2, level3], names=["l1", "l2", "l3"] + ) + pd_df = pandas.DataFrame( + [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + index=[5, 2, None], + columns=multi_columns, + dtype="Int64", + ) + bf_df = bpd.DataFrame(pd_df) + + bf_result = bf_df.stack(level=level).to_pandas() + # BigFrames emulates future_stack impl + pd_result = pd_df.stack(level=level, future_stack=True) # Pandas produces NaN, where bq dataframes produces pd.NA # Column ordering seems to depend on pandas version pandas.testing.assert_frame_equal( - bf_result.sort_index(axis=1), pd_result.sort_index(axis=1), check_dtype=False + bf_result, pd_result, check_dtype=False, check_index_type=False ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 77fb81d2c9..d024a57ded 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -20,6 +20,7 @@ import bigframes from bigframes import remote_function as rf +import bigframes.pandas as bpd from tests.system.utils import assert_pandas_df_equal_ignore_ordering @@ -465,6 +466,36 @@ def square(x): assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_default_connection(scalars_dfs, dataset_id): + @bpd.remote_function([int], int, dataset=dataset_id) + def square(x): + return x * x + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) + + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pd.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + + assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) + + @pytest.mark.flaky(retries=2, delay=120) def test_dataframe_applymap(session_with_bq_connection, scalars_dfs): def add_one(x): diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 8c1c36720b..802425510a 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -112,6 +112,44 @@ def test_series_get_column_default(scalars_dfs): assert result == "default_val" +def test_series_equals_identical(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.int64_col.equals(scalars_df_index.int64_col) + pd_result = scalars_pandas_df_index.int64_col.equals( + scalars_pandas_df_index.int64_col + ) + + assert pd_result == bf_result + + +def test_series_equals_df(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index["int64_col"].equals(scalars_df_index[["int64_col"]]) + pd_result = scalars_pandas_df_index["int64_col"].equals( + scalars_pandas_df_index[["int64_col"]] + ) + + assert pd_result == bf_result + + +def test_series_equals_different_dtype(scalars_df_index, scalars_pandas_df_index): + bf_series = scalars_df_index["int64_col"] + pd_series = scalars_pandas_df_index["int64_col"] + + bf_result = bf_series.equals(bf_series.astype("Float64")) + pd_result = pd_series.equals(pd_series.astype("Float64")) + + assert pd_result == bf_result + + +def test_series_equals_different_values(scalars_df_index, scalars_pandas_df_index): + bf_series = scalars_df_index["int64_col"] + pd_series = scalars_pandas_df_index["int64_col"] + + bf_result = bf_series.equals(bf_series + 1) + pd_result = pd_series.equals(pd_series + 1) + + assert pd_result == bf_result + + def test_series_get_with_default_index(scalars_dfs): col_name = "float64_col" key = 2 @@ -1915,6 +1953,20 @@ def test_series_iloc(scalars_df_index, scalars_pandas_df_index, start, stop, ste ) +def test_iat(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index["int64_too"].iat[3] + pd_result = scalars_pandas_df_index["int64_too"].iat[3] + + assert bf_result == pd_result + + +def test_iat_error(scalars_df_index, scalars_pandas_df_index): + with pytest.raises(ValueError): + scalars_pandas_df_index["int64_too"].iat["asd"] + with pytest.raises(ValueError): + scalars_df_index["int64_too"].iat["asd"] + + def test_series_add_prefix(scalars_df_index, scalars_pandas_df_index): bf_result = scalars_df_index["int64_too"].add_prefix("prefix_").to_pandas() diff --git a/tests/unit/test_clients.py b/tests/unit/test_clients.py new file mode 100644 index 0000000000..a90e5b0320 --- /dev/null +++ b/tests/unit/test_clients.py @@ -0,0 +1,57 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from bigframes import clients + + +def test_get_connection_name_full_none(): + connection_name = clients.get_connection_name_full( + None, default_project="default-project", default_location="us" + ) + assert connection_name == "default-project.us.bigframes-default-connection" + + +def test_get_connection_name_full_connection_id(): + connection_name = clients.get_connection_name_full( + "connection-id", default_project="default-project", default_location="us" + ) + assert connection_name == "default-project.us.connection-id" + + +def test_get_connection_name_full_location_connection_id(): + connection_name = clients.get_connection_name_full( + "eu.connection-id", default_project="default-project", default_location="us" + ) + assert connection_name == "default-project.eu.connection-id" + + +def test_get_connection_name_full_all(): + connection_name = clients.get_connection_name_full( + "my-project.eu.connection-id", + default_project="default-project", + default_location="us", + ) + assert connection_name == "my-project.eu.connection-id" + + +def test_get_connection_name_full_raise_value_error(): + + with pytest.raises(ValueError): + clients.get_connection_name_full( + "my-project.eu.connection-id.extra_field", + default_project="default-project", + default_location="us", + ) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index ee0cefb3d2..69b9e79807 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ibis import ibis.expr.types as ibis_types import pandas @@ -45,7 +44,7 @@ def test_arrayvalue_constructor_from_ibis_table_adds_all_columns(): actual = core.ArrayValue( session=session, table=ibis_table, columns=columns, ordering=ordering ) - assert actual.table is ibis_table + assert actual._table is ibis_table assert len(actual.columns) == 3 @@ -79,37 +78,12 @@ def test_arrayvalue_with_get_column(): ), total_ordering_columns=["col1"], ) - col1 = value.get_column("col1") + col1 = value._get_ibis_column("col1") assert isinstance(col1, ibis_types.Value) assert col1.get_name() == "col1" assert col1.type().is_int64() -def test_arrayvalue_to_ibis_expr_with_projection(): - value = resources.create_arrayvalue( - pandas.DataFrame( - { - "col1": [1, 2, 3], - "col2": ["a", "b", "c"], - "col3": [0.1, 0.2, 0.3], - } - ), - total_ordering_columns=["col1"], - ) - expr = value.projection( - [ - (value.table["col1"] + ibis.literal(-1)).name("int64_col"), - ibis.literal(123456789).name("literals"), - value.table["col2"].name("string_col"), - ] - ) - actual = expr._to_ibis_expr() - assert len(actual.columns) == 3 - assert actual.columns[0] == "int64_col" - assert actual.columns[1] == "literals" - assert actual.columns[2] == "string_col" - - def test_arrayvalues_to_ibis_expr_with_get_column(): value = resources.create_arrayvalue( pandas.DataFrame( @@ -121,7 +95,7 @@ def test_arrayvalues_to_ibis_expr_with_get_column(): ), total_ordering_columns=["col1"], ) - expr = value.get_column("col1") + expr = value._get_ibis_column("col1") assert expr.get_name() == "col1" assert expr.type().is_int64() @@ -138,7 +112,7 @@ def test_arrayvalues_to_ibis_expr_with_concat(): total_ordering_columns=["col1"], ) expr = value.concat([value]) - actual = expr._to_ibis_expr() + actual = expr._to_ibis_expr("unordered") assert len(actual.columns) == 3 # TODO(ashleyxu, b/299631930): test out the union expression assert actual.columns[0] == "column_0" @@ -175,7 +149,7 @@ def test_arrayvalues_to_ibis_expr_with_project_binary_op(): ) expr = value.project_binary_op("col2", "col3", ops.add_op, "col4") assert expr.columns[3].type().is_float64() - actual = expr._to_ibis_expr() + actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 4 assert actual.columns[3] == "col4" @@ -194,7 +168,7 @@ def test_arrayvalues_to_ibis_expr_with_project_ternary_op(): ) expr = value.project_ternary_op("col2", "col3", "col4", ops.where_op, "col5") assert expr.columns[4].type().is_float64() - actual = expr._to_ibis_expr() + actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 5 assert actual.columns[4] == "col5" @@ -215,7 +189,7 @@ def test_arrayvalue_to_ibis_expr_with_aggregate(): by_column_ids=["col1"], dropna=False, ) - actual = expr._to_ibis_expr() + actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 2 assert actual.columns[0] == "col1" assert actual.columns[1] == "col4" @@ -234,7 +208,7 @@ def test_arrayvalue_to_ibis_expr_with_corr_aggregate(): total_ordering_columns=["col1"], ) expr = value.corr_aggregate(corr_aggregations=[("col1", "col3", "col4")]) - actual = expr._to_ibis_expr() + actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 1 assert actual.columns[0] == "col4" assert expr.columns[0].type().is_float64() diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 17d941fbdd..80a5428b36 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -399,6 +399,28 @@ def to_orc(self, path=None, **kwargs) -> bytes | None: # ---------------------------------------------------------------------- # Unsorted + def equals(self, other) -> bool: + """ + Test whether two objects contain the same elements. + + This function allows two Series or DataFrames to be compared against + each other to see if they have the same shape and elements. NaNs in + the same location are considered equal. + + The row/column index do not need to have the same type, as long + as the values are considered equal. Corresponding columns must be of + the same dtype. + + Args: + other (Series or DataFrame): + The other Series or DataFrame to be compared with the first. + + Returns: + bool: True if all elements are the same in both objects, False + otherwise. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def assign(self, **kwargs) -> DataFrame: r""" Assign new columns to a DataFrame. @@ -2075,3 +2097,13 @@ def fillna(self, value): DataFrame: Object with missing values filled """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def iloc(self): + """Purely integer-location based indexing for selection by position.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def iat(self): + """Access a single value for a row/column pair by integer position.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index a41a3454ca..03729922d5 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -64,6 +64,20 @@ def name(self) -> Hashable: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + @property + def T(self) -> Series: + """Return the transpose, which is by definition self.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def transpose(self) -> Series: + """ + Return the transpose, which is by definition self. + + Returns: + Series + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def reset_index( self, *, @@ -1809,3 +1823,13 @@ def map( Series: Same index as caller. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def iloc(self): + """Purely integer-location based indexing for selection by position.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def iat(self): + """Access a single value for a row/column pair by integer position.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/preprocessing/_data.py b/third_party/bigframes_vendored/sklearn/preprocessing/_data.py index 58e16e135b..5ce102d573 100644 --- a/third_party/bigframes_vendored/sklearn/preprocessing/_data.py +++ b/third_party/bigframes_vendored/sklearn/preprocessing/_data.py @@ -29,7 +29,7 @@ class StandardScaler(BaseEstimator, TransformerMixin): individual features do not more or less look like standard normally distributed data (e.g. Gaussian with 0 mean and unit variance). - Examples: + **Examples:** .. code-block:: diff --git a/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py b/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py index cf660ece5d..8da9a98c53 100644 --- a/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py +++ b/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py @@ -17,6 +17,21 @@ class OneHotEncoder(BaseEstimator): Note that this method deviates from Scikit-Learn; instead of producing sparse binary columns, the encoding is a single column of `STRUCT`. + **Examples:** + + Given a dataset with two features, we let the encoder find the unique + values per feature and transform the data to a binary one-hot encoding. + + .. code-block:: + + from bigframes.ml.preprocessing import OneHotEncoder + import bigframes.pandas as bpd + + enc = OneHotEncoder() + X = bpd.DataFrame({"a": ["Male", "Female", "Female"], "b": ["1", "3", "2"]}) + enc.fit(X) + print(enc.transform(bpd.DataFrame({"a": ["Female", "Male"], "b": ["1", "4"]}))) + Args: drop (Optional[Literal["most_frequent"]], default None): Specifies a methodology to use to drop one of the categories per feature. @@ -37,21 +52,6 @@ class OneHotEncoder(BaseEstimator): when considering infrequent categories. If there are infrequent categories, max_categories includes the category representing the infrequent categories along with the frequent categories. Default None, set limit to 1,000,000. - - Examples: - - Given a dataset with two features, we let the encoder find the unique - values per feature and transform the data to a binary one-hot encoding. - - .. code-block:: - - from bigframes.ml.preprocessing import OneHotEncoder - import bigframes.pandas as bpd - - enc = OneHotEncoder() - X = bpd.DataFrame({"a": ["Male", "Female", "Female"], "b": ["1", "3", "2"]}) - enc.fit(X) - print(enc.transform(bpd.DataFrame({"a": ["Female", "Male"], "b": ["1", "4"]}))) """ def fit(self, X, y=None):