diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 453b540c1e..9bee240971 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:caffe0a9277daeccc4d1de5c9b55ebba0901b57c2f713ec9c876b0d4ec064f61 -# created: 2023-11-08T19:46:45.022803742Z + digest: sha256:346ab2efb51649c5dde7756cbbdc60dd394852ba83b9bbffc292a63549f33c17 +# created: 2023-12-14T22:17:57.611773021Z diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 221806cedf..698fbc5c94 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -8,9 +8,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.9" - name: Install nox @@ -24,9 +24,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.10" - name: Install nox diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 16d5a9e90f..4866193af2 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -8,9 +8,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.8" - name: Install nox diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 465199fc9a..f059b5548a 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -11,9 +11,9 @@ jobs: python: ['3.9', '3.10', '3.11'] steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} - name: Install nox @@ -26,9 +26,9 @@ jobs: run: | nox -s unit-${{ matrix.python }} - name: Upload coverage results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: coverage-artifacts + name: coverage-artifact-${{ matrix.python }} path: .coverage-${{ matrix.python }} cover: @@ -37,9 +37,9 @@ jobs: - unit steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.8" - name: Install coverage @@ -47,11 +47,11 @@ jobs: python -m pip install --upgrade setuptools pip wheel python -m pip install coverage - name: Download coverage results - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: - name: coverage-artifacts path: .coverage-results/ - name: Report coverage results run: | - coverage combine .coverage-results/.coverage* + find .coverage-results -type f -name '*.zip' -exec unzip {} \; + coverage combine .coverage-results/**/.coverage* coverage report --show-missing --fail-under=35 diff --git a/.kokoro/release-nightly.sh b/.kokoro/release-nightly.sh index 0751cf2502..5624df3b8d 100755 --- a/.kokoro/release-nightly.sh +++ b/.kokoro/release-nightly.sh @@ -55,6 +55,9 @@ rm -rf build dist # internal issue b/261050975. git config --global --add safe.directory "${PROJECT_ROOT}" +# Workaround for older pip not able to resolve dependencies. See internal +# issue 316909553. +python3.10 -m pip install pip==23.3.2 python3.10 -m pip install --require-hashes -r .kokoro/requirements.txt # Disable buffering, so that the logs stream through. diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 8957e21104..e5c1ffca94 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -93,30 +93,30 @@ colorlog==6.7.0 \ # via # gcp-docuploader # nox -cryptography==41.0.5 \ - --hash=sha256:0c327cac00f082013c7c9fb6c46b7cc9fa3c288ca702c74773968173bda421bf \ - --hash=sha256:0d2a6a598847c46e3e321a7aef8af1436f11c27f1254933746304ff014664d84 \ - --hash=sha256:227ec057cd32a41c6651701abc0328135e472ed450f47c2766f23267b792a88e \ - --hash=sha256:22892cc830d8b2c89ea60148227631bb96a7da0c1b722f2aac8824b1b7c0b6b8 \ - --hash=sha256:392cb88b597247177172e02da6b7a63deeff1937fa6fec3bbf902ebd75d97ec7 \ - --hash=sha256:3be3ca726e1572517d2bef99a818378bbcf7d7799d5372a46c79c29eb8d166c1 \ - --hash=sha256:573eb7128cbca75f9157dcde974781209463ce56b5804983e11a1c462f0f4e88 \ - --hash=sha256:580afc7b7216deeb87a098ef0674d6ee34ab55993140838b14c9b83312b37b86 \ - --hash=sha256:5a70187954ba7292c7876734183e810b728b4f3965fbe571421cb2434d279179 \ - --hash=sha256:73801ac9736741f220e20435f84ecec75ed70eda90f781a148f1bad546963d81 \ - --hash=sha256:7d208c21e47940369accfc9e85f0de7693d9a5d843c2509b3846b2db170dfd20 \ - --hash=sha256:8254962e6ba1f4d2090c44daf50a547cd5f0bf446dc658a8e5f8156cae0d8548 \ - --hash=sha256:88417bff20162f635f24f849ab182b092697922088b477a7abd6664ddd82291d \ - --hash=sha256:a48e74dad1fb349f3dc1d449ed88e0017d792997a7ad2ec9587ed17405667e6d \ - --hash=sha256:b948e09fe5fb18517d99994184854ebd50b57248736fd4c720ad540560174ec5 \ - --hash=sha256:c707f7afd813478e2019ae32a7c49cd932dd60ab2d2a93e796f68236b7e1fbf1 \ - --hash=sha256:d38e6031e113b7421db1de0c1b1f7739564a88f1684c6b89234fbf6c11b75147 \ - --hash=sha256:d3977f0e276f6f5bf245c403156673db103283266601405376f075c849a0b936 \ - --hash=sha256:da6a0ff8f1016ccc7477e6339e1d50ce5f59b88905585f77193ebd5068f1e797 \ - --hash=sha256:e270c04f4d9b5671ebcc792b3ba5d4488bf7c42c3c241a3748e2599776f29696 \ - --hash=sha256:e886098619d3815e0ad5790c973afeee2c0e6e04b4da90b88e6bd06e2a0b1b72 \ - --hash=sha256:ec3b055ff8f1dce8e6ef28f626e0972981475173d7973d63f271b29c8a2897da \ - --hash=sha256:fba1e91467c65fe64a82c689dc6cf58151158993b13eb7a7f3f4b7f395636723 +cryptography==41.0.6 \ + --hash=sha256:068bc551698c234742c40049e46840843f3d98ad7ce265fd2bd4ec0d11306596 \ + --hash=sha256:0f27acb55a4e77b9be8d550d762b0513ef3fc658cd3eb15110ebbcbd626db12c \ + --hash=sha256:2132d5865eea673fe6712c2ed5fb4fa49dba10768bb4cc798345748380ee3660 \ + --hash=sha256:3288acccef021e3c3c10d58933f44e8602cf04dba96d9796d70d537bb2f4bbc4 \ + --hash=sha256:35f3f288e83c3f6f10752467c48919a7a94b7d88cc00b0668372a0d2ad4f8ead \ + --hash=sha256:398ae1fc711b5eb78e977daa3cbf47cec20f2c08c5da129b7a296055fbb22aed \ + --hash=sha256:422e3e31d63743855e43e5a6fcc8b4acab860f560f9321b0ee6269cc7ed70cc3 \ + --hash=sha256:48783b7e2bef51224020efb61b42704207dde583d7e371ef8fc2a5fb6c0aabc7 \ + --hash=sha256:4d03186af98b1c01a4eda396b137f29e4e3fb0173e30f885e27acec8823c1b09 \ + --hash=sha256:5daeb18e7886a358064a68dbcaf441c036cbdb7da52ae744e7b9207b04d3908c \ + --hash=sha256:60e746b11b937911dc70d164060d28d273e31853bb359e2b2033c9e93e6f3c43 \ + --hash=sha256:742ae5e9a2310e9dade7932f9576606836ed174da3c7d26bc3d3ab4bd49b9f65 \ + --hash=sha256:7e00fb556bda398b99b0da289ce7053639d33b572847181d6483ad89835115f6 \ + --hash=sha256:85abd057699b98fce40b41737afb234fef05c67e116f6f3650782c10862c43da \ + --hash=sha256:8efb2af8d4ba9dbc9c9dd8f04d19a7abb5b49eab1f3694e7b5a16a5fc2856f5c \ + --hash=sha256:ae236bb8760c1e55b7a39b6d4d32d2279bc6c7c8500b7d5a13b6fb9fc97be35b \ + --hash=sha256:afda76d84b053923c27ede5edc1ed7d53e3c9f475ebaf63c68e69f1403c405a8 \ + --hash=sha256:b27a7fd4229abef715e064269d98a7e2909ebf92eb6912a9603c7e14c181928c \ + --hash=sha256:b648fe2a45e426aaee684ddca2632f62ec4613ef362f4d681a9a6283d10e079d \ + --hash=sha256:c5a550dc7a3b50b116323e3d376241829fd326ac47bc195e04eb33a8170902a9 \ + --hash=sha256:da46e2b5df770070412c46f87bac0849b8d685c5f2679771de277a422c7d0b86 \ + --hash=sha256:f39812f70fc5c71a15aa3c97b2bbe213c3f2a460b79bd21c40d033bb34a9bf36 \ + --hash=sha256:ff369dd19e8fe0528b02e8df9f2aeb2479f89b1270d90f96a63500afe9af5cae # via # gcp-releasetool # secretstorage diff --git a/.kokoro/samples/python3.12/common.cfg b/.kokoro/samples/python3.12/common.cfg new file mode 100644 index 0000000000..abf83e196d --- /dev/null +++ b/.kokoro/samples/python3.12/common.cfg @@ -0,0 +1,40 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +# Build logs will be here +action { + define_artifacts { + regex: "**/*sponge_log.xml" + } +} + +# Specify which tests to run +env_vars: { + key: "RUN_TESTS_SESSION" + value: "py-3.12" +} + +# Declare build specific Cloud project. +env_vars: { + key: "BUILD_SPECIFIC_GCLOUD_PROJECT" + value: "python-docs-samples-tests-312" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-bigquery-dataframes/.kokoro/test-samples.sh" +} + +# Configure the docker image for kokoro-trampoline. +env_vars: { + key: "TRAMPOLINE_IMAGE" + value: "gcr.io/cloud-devrel-kokoro-resources/python-samples-testing-docker" +} + +# Download secrets for samples +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/python-docs-samples" + +# Download trampoline resources. +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/trampoline" + +# Use the trampoline script to run in docker. +build_file: "python-bigquery-dataframes/.kokoro/trampoline_v2.sh" \ No newline at end of file diff --git a/.kokoro/samples/python3.12/continuous.cfg b/.kokoro/samples/python3.12/continuous.cfg new file mode 100644 index 0000000000..a1c8d9759c --- /dev/null +++ b/.kokoro/samples/python3.12/continuous.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/.kokoro/samples/python3.12/periodic-head.cfg b/.kokoro/samples/python3.12/periodic-head.cfg new file mode 100644 index 0000000000..123a35fbd3 --- /dev/null +++ b/.kokoro/samples/python3.12/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-bigquery-dataframes/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/samples/python3.12/periodic.cfg b/.kokoro/samples/python3.12/periodic.cfg new file mode 100644 index 0000000000..71cd1e597e --- /dev/null +++ b/.kokoro/samples/python3.12/periodic.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "False" +} diff --git a/.kokoro/samples/python3.12/presubmit.cfg b/.kokoro/samples/python3.12/presubmit.cfg new file mode 100644 index 0000000000..a1c8d9759c --- /dev/null +++ b/.kokoro/samples/python3.12/presubmit.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c49c5b63b..77a6576ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,49 @@ [1]: https://pypi.org/project/bigframes/#history +## [0.18.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.17.0...v0.18.0) (2024-01-02) + + +### Features + +* Add dataframe.to_html ([#259](https://github.com/googleapis/python-bigquery-dataframes/issues/259)) ([2cd6489](https://github.com/googleapis/python-bigquery-dataframes/commit/2cd64891170dcd4f2a709024a2993e36db210976)) +* Add IntervalIndex support to bigframes.pandas.cut ([#254](https://github.com/googleapis/python-bigquery-dataframes/issues/254)) ([6c1969a](https://github.com/googleapis/python-bigquery-dataframes/commit/6c1969a35fe720cf3a804006bcc9046ba554fcc3)) +* Add replace method to DataFrame ([#261](https://github.com/googleapis/python-bigquery-dataframes/issues/261)) ([5092215](https://github.com/googleapis/python-bigquery-dataframes/commit/5092215767d77c90b132e9cd6b3e3749827ebe09)) +* Specific pyarrow mappings for decimal, bytes types ([#283](https://github.com/googleapis/python-bigquery-dataframes/issues/283)) ([a1c0631](https://github.com/googleapis/python-bigquery-dataframes/commit/a1c06319ab0e3697c3175112490488002bb344c0)) + + +### Bug Fixes + +* Dataframes to_gbq now creates dataset if it doesn't exist ([#222](https://github.com/googleapis/python-bigquery-dataframes/issues/222)) ([bac62f7](https://github.com/googleapis/python-bigquery-dataframes/commit/bac62f76af1af6ca8834c3690c7c79aeb12dd331)) +* Exclude pandas 2.2.0rc0 to unblock prerelease tests ([#292](https://github.com/googleapis/python-bigquery-dataframes/issues/292)) ([ac1a745](https://github.com/googleapis/python-bigquery-dataframes/commit/ac1a745ddce9865f4585777b43c2234b9bf2841d)) +* Fix DataFrameGroupby.agg() issue with as_index=False ([#273](https://github.com/googleapis/python-bigquery-dataframes/issues/273)) ([ab49350](https://github.com/googleapis/python-bigquery-dataframes/commit/ab493506e71ed8970a11fe2f88b2145150e09291)) +* Make `Series.str.replace` work for simple strings ([#285](https://github.com/googleapis/python-bigquery-dataframes/issues/285)) ([ad67465](https://github.com/googleapis/python-bigquery-dataframes/commit/ad6746569b3af11be9d40805a1449ee1e89288dc)) +* Update dataframe.to_gbq to dedup column names. ([#286](https://github.com/googleapis/python-bigquery-dataframes/issues/286)) ([746115d](https://github.com/googleapis/python-bigquery-dataframes/commit/746115d5564c95bc3c4a5309c99e7a29e535e6fe)) +* Use setuptools.find_namespace_packages ([#246](https://github.com/googleapis/python-bigquery-dataframes/issues/246)) ([9ec352a](https://github.com/googleapis/python-bigquery-dataframes/commit/9ec352a338f11d82aee9cd665ffb0e6e97cb391b)) + + +### Dependencies + +* Migrate to `ibis-framework >= "7.1.0"` ([#53](https://github.com/googleapis/python-bigquery-dataframes/issues/53)) ([9798a2b](https://github.com/googleapis/python-bigquery-dataframes/commit/9798a2b14dffb20432f732343cac92341e42fe09)) + + +### Documentation + +* Add code snippets for explore query result page ([#278](https://github.com/googleapis/python-bigquery-dataframes/issues/278)) ([7cbbb7d](https://github.com/googleapis/python-bigquery-dataframes/commit/7cbbb7d4608d8b7d1a360b2fe2d39d89a52f9546)) +* Code samples for `astype` common to DataFrame and Series ([#280](https://github.com/googleapis/python-bigquery-dataframes/issues/280)) ([95b673a](https://github.com/googleapis/python-bigquery-dataframes/commit/95b673aeb1545744e4b1a353cf1f4d0202d8a1b2)) +* Code samples for `DataFrame.copy` and `Series.copy` ([#290](https://github.com/googleapis/python-bigquery-dataframes/issues/290)) ([7cbc2b0](https://github.com/googleapis/python-bigquery-dataframes/commit/7cbc2b0ba572d11778ba7caf7c95b7fb8f3a31a7)) +* Code samples for `drop` and `fillna` ([#284](https://github.com/googleapis/python-bigquery-dataframes/issues/284)) ([9c5012e](https://github.com/googleapis/python-bigquery-dataframes/commit/9c5012ec68275db83d1f6f7e743f5edaaaacd8cb)) +* Code samples for `isna`, `isnull`, `dropna`, `isin` ([#289](https://github.com/googleapis/python-bigquery-dataframes/issues/289)) ([ad51035](https://github.com/googleapis/python-bigquery-dataframes/commit/ad51035bcf80d6a49f134df26624b578010b5b12)) +* Code samples for `rename` , `size` ([#293](https://github.com/googleapis/python-bigquery-dataframes/issues/293)) ([eb69f60](https://github.com/googleapis/python-bigquery-dataframes/commit/eb69f60db52544882fb06c2d5fa0e41226dfe93f)) +* Code samples for `reset_index` and `sort_values` ([#282](https://github.com/googleapis/python-bigquery-dataframes/issues/282)) ([acc0eb7](https://github.com/googleapis/python-bigquery-dataframes/commit/acc0eb7010951c8cfb91aecc45268b041217dd09)) +* Code samples for `sample`, `get`, `Series.round` ([#295](https://github.com/googleapis/python-bigquery-dataframes/issues/295)) ([c2b1892](https://github.com/googleapis/python-bigquery-dataframes/commit/c2b1892825545a34ce4ed5b0ef99e99348466108)) +* Code samples for `Series.{add, replace, unique, T, transpose}` ([#287](https://github.com/googleapis/python-bigquery-dataframes/issues/287)) ([0e1bbfc](https://github.com/googleapis/python-bigquery-dataframes/commit/0e1bbfc1055aff9757b5138907c11caab2f3965a)) +* Code samples for `Series.{map, to_list, count}` ([#290](https://github.com/googleapis/python-bigquery-dataframes/issues/290)) ([7cbc2b0](https://github.com/googleapis/python-bigquery-dataframes/commit/7cbc2b0ba572d11778ba7caf7c95b7fb8f3a31a7)) +* Code samples for `Series.{name, std, agg}` ([#293](https://github.com/googleapis/python-bigquery-dataframes/issues/293)) ([eb69f60](https://github.com/googleapis/python-bigquery-dataframes/commit/eb69f60db52544882fb06c2d5fa0e41226dfe93f)) +* Code samples for `Series.groupby` and `Series.{sum,mean,min,max}` ([#280](https://github.com/googleapis/python-bigquery-dataframes/issues/280)) ([95b673a](https://github.com/googleapis/python-bigquery-dataframes/commit/95b673aeb1545744e4b1a353cf1f4d0202d8a1b2)) +* Code samples for DataFrame `set_index`, `items` ([#295](https://github.com/googleapis/python-bigquery-dataframes/issues/295)) ([c2b1892](https://github.com/googleapis/python-bigquery-dataframes/commit/c2b1892825545a34ce4ed5b0ef99e99348466108)) +* Fix the rendering for `get_dummies` ([#291](https://github.com/googleapis/python-bigquery-dataframes/issues/291)) ([252f3a2](https://github.com/googleapis/python-bigquery-dataframes/commit/252f3a2a0e1296c7d786acdc0bdebe9e4a9ae1be)) + ## [0.17.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.16.0...v0.17.0) (2023-12-14) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index e19fec8f3f..e8ac8c1d0f 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -16,10 +16,8 @@ from dataclasses import dataclass import io import typing -from typing import Iterable, Literal, Optional, Sequence, Tuple +from typing import Iterable, Literal, Sequence -from google.cloud import bigquery -import ibis import ibis.expr.types as ibis_types import pandas @@ -86,7 +84,17 @@ def session(self) -> Session: required_session = self.node.session from bigframes import get_global_session - return self.node.session[0] if required_session else get_global_session() + return ( + required_session if (required_session is not None) else get_global_session() + ) + + def _try_evaluate_local(self): + """Use only for unit testing paths - not fully featured. Will throw exception if fails.""" + import ibis + + return ibis.pandas.connect({}).execute( + self._compile_ordered()._to_ibis_expr(ordering_mode="unordered") + ) def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: return self._compile_ordered().get_column_type(key) @@ -97,97 +105,9 @@ def _compile_ordered(self) -> compiled.OrderedIR: def _compile_unordered(self) -> compiled.UnorderedIR: return compiler.compile_unordered(self.node) - def shape(self) -> typing.Tuple[int, int]: - """Returns dimensions as (length, width) tuple.""" - width = len(self._compile_unordered().columns) - count_expr = self._compile_unordered()._to_ibis_expr().count() - - # Support in-memory engines for hermetic unit tests. - if not self.node.session: - try: - length = ibis.pandas.connect({}).execute(count_expr) - return (length, width) - except Exception: - # Not all cases can be handled by pandas engine - pass - - sql = self.session.ibis_client.compile(count_expr) - row_iterator, _ = self.session._start_query( - sql=sql, - max_results=1, - ) - length = next(row_iterator)[0] - return (length, width) - - def to_sql( - self, - offset_column: typing.Optional[str] = None, - col_id_overrides: typing.Mapping[str, str] = {}, - sorted: bool = False, - ) -> str: - array_value = self - if offset_column: - array_value = self.promote_offsets(offset_column) - if sorted: - return array_value._compile_ordered().to_sql( - col_id_overrides=col_id_overrides, - sorted=sorted, - ) - else: - return array_value._compile_unordered().to_sql( - col_id_overrides=col_id_overrides - ) - - def start_query( - self, - job_config: Optional[bigquery.job.QueryJobConfig] = None, - max_results: Optional[int] = None, - *, - sorted: bool = True, - ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: - """Execute a query and return metadata about the results.""" - # TODO(swast): Cache the job ID so we can look it up again if they ask - # for the results? We'd need a way to invalidate the cache if DataFrame - # becomes mutable, though. Or move this method to the immutable - # expression class. - # TODO(swast): We might want to move this method to Session and/or - # provide our own minimal metadata class. Tight coupling to the - # BigQuery client library isn't ideal, especially if we want to support - # a LocalSession for unit testing. - # TODO(swast): Add a timeout here? If the query is taking a long time, - # maybe we just print the job metadata that we have so far? - sql = self.to_sql(sorted=sorted) # type:ignore - return self.session._start_query( - sql=sql, - job_config=job_config, - max_results=max_results, - ) - - def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: - """Write the ArrayValue to a session table and create a new block object that references it.""" - compiled_value = self._compile_ordered() - ibis_expr = compiled_value._to_ibis_expr( - ordering_mode="unordered", expose_hidden_cols=True - ) - tmp_table = self.session._ibis_to_temp_table( - ibis_expr, cluster_cols=cluster_cols, api_name="cached" - ) - - table_expression = self.session.ibis_client.table( - f"{tmp_table.project}.{tmp_table.dataset_id}.{tmp_table.table_id}" - ) - new_columns = [table_expression[column] for column in compiled_value.column_ids] - new_hidden_columns = [ - table_expression[column] - for column in compiled_value._hidden_ordering_column_names - ] - return ArrayValue.from_ibis( - self.session, - table_expression, - columns=new_columns, - hidden_ordering_columns=new_hidden_columns, - ordering=compiled_value._ordering, - ) + def row_count(self) -> ArrayValue: + """Get number of rows in ArrayValue as a single-entry ArrayValue.""" + return ArrayValue(nodes.RowCountNode(child=self.node)) # Operations diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index df84f70859..c6867c1a33 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -131,7 +131,7 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block: if len(index_columns) != 1: raise ValueError("only method 'linear' supports multi-index") xvalues = block.index_columns[0] - if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES: + if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: raise ValueError("Can only interpolate on numeric index.") for column in original_columns: @@ -332,7 +332,6 @@ def value_counts( by_column_ids=columns, aggregations=[(dummy, agg_ops.count_op)], dropna=dropna, - as_index=True, ) count_id = agg_ids[0] if normalize: diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 34913872e7..779d11b371 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -66,7 +66,7 @@ _MONOTONIC_DECREASING = "monotonic_decreasing" -LevelType = typing.Union[str, int] +LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] @@ -137,8 +137,19 @@ def index(self) -> indexes.IndexValue: @functools.cached_property def shape(self) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" - impl_length, _ = self._expr.shape() - return (impl_length, len(self.value_columns)) + row_count_expr = self.expr.row_count() + + # Support in-memory engines for hermetic unit tests. + if self.expr.node.session is None: + try: + row_count = row_count_expr._try_evaluate_local().squeeze() + return (row_count, len(self.value_columns)) + except Exception: + pass + + iter, _ = self.session._execute(row_count_expr, sorted=False) + row_count = next(iter)[0] + return (row_count, len(self.value_columns)) @property def index_columns(self) -> Sequence[str]: @@ -182,6 +193,10 @@ def index_dtypes( """Returns the dtypes of the index columns.""" return [self.expr.get_column_type(col) for col in self.index_columns] + @property + def session(self) -> core.Session: + return self._expr.session + @functools.cached_property def col_id_to_label(self) -> typing.Mapping[str, Label]: """Get column label for value columns, or index name for index columns""" @@ -376,7 +391,7 @@ def _to_dataframe(self, result) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" dtypes = dict(zip(self.index_columns, self.index_dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - return self._expr.session._rows_to_dataframe(result, dtypes) + return self.session._rows_to_dataframe(result, dtypes) def to_pandas( self, @@ -404,9 +419,9 @@ def to_pandas_batches(self): """Download results one message at a time.""" dtypes = dict(zip(self.index_columns, self.index_dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - results_iterator, _ = self._expr.start_query() + results_iterator, _ = self.session._execute(self.expr, sorted=True) for arrow_table in results_iterator.to_arrow_iterable( - bqstorage_client=self._expr.session.bqstoragereadclient + bqstorage_client=self.session.bqstoragereadclient ): df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) self._copy_index_to_pandas(df) @@ -460,12 +475,12 @@ def _compute_and_count( expr = self._apply_value_keys_to_expr(value_keys=value_keys) - results_iterator, query_job = expr.start_query( - max_results=max_results, sorted=ordered + results_iterator, query_job = self.session._execute( + expr, max_results=max_results, sorted=ordered ) table_size = ( - expr.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES + self.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES ) fraction = ( max_download_size / table_size @@ -607,7 +622,7 @@ def _compute_dry_run( ) -> bigquery.QueryJob: expr = self._apply_value_keys_to_expr(value_keys=value_keys) job_config = bigquery.QueryJobConfig(dry_run=True) - _, query_job = expr.start_query(job_config=job_config) + _, query_job = self.session._execute(expr, job_config=job_config, dry_run=True) return query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): @@ -926,7 +941,6 @@ def aggregate( by_column_ids: typing.Sequence[str] = (), aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (), *, - as_index: bool = True, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: """ @@ -947,40 +961,21 @@ def aggregate( aggregate_labels = self._get_labels_for_columns( [agg[0] for agg in aggregations] ) - if as_index: - names: typing.List[Label] = [] - for by_col_id in by_column_ids: - if by_col_id in self.value_columns: - names.append(self.col_id_to_label[by_col_id]) - else: - names.append(self.col_id_to_index_name[by_col_id]) - return ( - Block( - result_expr, - index_columns=by_column_ids, - column_labels=aggregate_labels, - index_labels=names, - ), - output_col_ids, - ) - else: # as_index = False - # If as_index=False, drop grouping levels, but keep grouping value columns - by_value_columns = [ - col for col in by_column_ids if col in self.value_columns - ] - by_column_labels = self._get_labels_for_columns(by_value_columns) - labels = (*by_column_labels, *aggregate_labels) - offsets_id = guid.generate_guid() - result_expr_pruned = result_expr.select_columns( - [*by_value_columns, *output_col_ids] - ).promote_offsets(offsets_id) - - return ( - Block( - result_expr_pruned, index_columns=[offsets_id], column_labels=labels - ), - output_col_ids, - ) + names: typing.List[Label] = [] + for by_col_id in by_column_ids: + if by_col_id in self.value_columns: + names.append(self.col_id_to_label[by_col_id]) + else: + names.append(self.col_id_to_index_name[by_col_id]) + return ( + Block( + result_expr, + index_columns=by_column_ids, + column_labels=aggregate_labels, + index_labels=names, + ), + output_col_ids, + ) def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): """Gets aggregates immediately, and caches it""" @@ -1068,7 +1063,7 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.AggregateOp]: stats: list[agg_ops.AggregateOp] = [agg_ops.count_op] if dtype not in bigframes.dtypes.UNORDERED_DTYPES: stats += [agg_ops.min_op, agg_ops.max_op] - if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES: + if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: # Notable exclusions: # prod op tends to cause overflows # Also, var_op is redundant as can be derived from std @@ -1309,7 +1304,6 @@ def pivot( result_block, _ = block.aggregate( by_column_ids=self.index_columns, aggregations=aggregations, - as_index=True, dropna=True, ) @@ -1668,7 +1662,7 @@ def to_sql_query( # the BigQuery unicode column name feature? substitutions[old_id] = new_id - sql = array_value.to_sql(col_id_overrides=substitutions) + sql = self.session._to_sql(array_value, col_id_overrides=substitutions) return ( sql, new_ids[: len(idx_labels)], @@ -1678,7 +1672,7 @@ def to_sql_query( def cached(self) -> Block: """Write the block to a session table and create a new block object that references it.""" return Block( - self.expr.cached(cluster_cols=self.index_columns), + self.session._execute_and_cache(self.expr, cluster_cols=self.index_columns), index_columns=self.index_columns, column_labels=self.column_labels, index_labels=self.index_labels, diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 461c2c005a..524699290b 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -21,6 +21,7 @@ import ibis import ibis.backends.bigquery as ibis_bigquery +import ibis.common.deferred # type: ignore import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types import pandas @@ -62,7 +63,16 @@ def __init__( self._columns = tuple(columns) # To allow for more efficient lookup by column name, create a # dictionary mapping names to column values. - self._column_names = {column.get_name(): column for column in self._columns} + self._column_names = { + ( + column.resolve(table) + # TODO(https://github.com/ibis-project/ibis/issues/7613): use + # public API to refer to Deferred type. + if isinstance(column, ibis.common.deferred.Deferred) + else column + ).get_name(): column + for column in self._columns + } @property def columns(self) -> typing.Tuple[ibis_types.Value, ...]: @@ -210,7 +220,10 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value: raise ValueError( "Column name {} not in set of values: {}".format(key, self.column_ids) ) - return typing.cast(ibis_types.Value, self._column_names[key]) + return typing.cast( + ibis_types.Value, + bigframes.dtypes.ibis_value_to_canonical_type(self._column_names[key]), + ) def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: ibis_type = typing.cast( @@ -258,6 +271,22 @@ def to_sql( ) return typing.cast(str, sql) + def row_count(self) -> OrderedIR: + original_table = self._to_ibis_expr() + ibis_table = original_table.agg( + [ + original_table.count().name("count"), + ] + ) + return OrderedIR( + ibis_table, + (ibis_table["count"],), + ordering=ExpressionOrdering( + ordering_value_columns=(OrderingColumnReference("count"),), + total_ordering_columns=frozenset(["count"]), + ), + ) + def _to_ibis_expr( self, *, @@ -317,7 +346,9 @@ def _to_ibis_expr( table = table.filter(base_table[PREDICATE_COLUMN]) table = table.drop(*columns_to_drop) if col_id_overrides: - table = table.relabel(col_id_overrides) + table = table.rename( + {value: key for key, value in col_id_overrides.items()} + ) if fraction is not None: table = table.filter(ibis.random() < ibis.literal(fraction)) return table @@ -643,7 +674,16 @@ def __init__( # To allow for more efficient lookup by column name, create a # dictionary mapping names to column values. - self._column_names = {column.get_name(): column for column in self._columns} + self._column_names = { + ( + column.resolve(table) + # TODO(https://github.com/ibis-project/ibis/issues/7613): use + # public API to refer to Deferred type. + if isinstance(column, ibis.common.deferred.Deferred) + else column + ).get_name(): column + for column in self._columns + } self._hidden_ordering_column_names = { column.get_name(): column for column in self._hidden_ordering_columns } @@ -860,7 +900,7 @@ def project_window_op( case_statement = ibis.case() for clause in clauses: case_statement = case_statement.when(clause[0], clause[1]) - case_statement = case_statement.else_(window_op).end() + case_statement = case_statement.else_(window_op).end() # type: ignore window_op = case_statement result = self._set_or_replace_by_id(output_name or column_name, window_op) @@ -1142,14 +1182,23 @@ def _to_ibis_expr( # Make sure all dtypes are the "canonical" ones for BigFrames. This is # important for operations like UNION where the schema must match. table = self._table.select( - bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns + bigframes.dtypes.ibis_value_to_canonical_type( + column.resolve(self._table) + # TODO(https://github.com/ibis-project/ibis/issues/7613): use + # public API to refer to Deferred type. + if isinstance(column, ibis.common.deferred.Deferred) + else column + ) + for column in columns ) base_table = table if self._reduced_predicate is not None: table = table.filter(base_table[PREDICATE_COLUMN]) table = table.drop(*columns_to_drop) if col_id_overrides: - table = table.relabel(col_id_overrides) + table = table.rename( + {value: key for key, value in col_id_overrides.items()} + ) if fraction is not None: table = table.filter(ibis.random() < ibis.literal(fraction)) return table diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 39892635f1..17dcde638f 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -173,6 +173,12 @@ def compile_concat(node: nodes.ConcatNode, ordered: bool = True): return concat_impl.concat_unordered(compiled_unordered) +@_compile_node.register +def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True): + result = compile_unordered(node.child).row_count() + return result if ordered else result.to_unordered() + + @_compile_node.register def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): result = compile_unordered(node.child).aggregate( diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index a8b8afdae7..66ba901649 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -263,10 +263,10 @@ def _agg_string(self, func: str) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = [] @@ -285,7 +285,6 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) if want_aggfunc_level: @@ -297,7 +296,8 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: ) else: agg_block = agg_block.with_column_labels(pd.Index(column_labels)) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_list(self, func: typing.Sequence) -> df.DataFrame: aggregations = [ @@ -311,7 +311,6 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) agg_block = agg_block.with_column_labels( @@ -319,7 +318,8 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: column_labels, names=[*self._block.column_labels.names, None] ) ) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_named(self, **kwargs) -> df.DataFrame: aggregations = [] @@ -339,17 +339,28 @@ def _agg_named(self, **kwargs) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) agg_block = agg_block.with_column_labels(column_labels) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) + + def _convert_index(self, dataframe: df.DataFrame): + """Convert index levels to columns except where names conflict.""" + levels_to_drop = [ + level for level in dataframe.index.names if level in dataframe.columns + ] + + if len(levels_to_drop) == dataframe.index.nlevels: + return dataframe.reset_index(drop=True) + return dataframe.droplevel(levels_to_drop).reset_index(drop=False) aggregate = agg def _raise_on_non_numeric(self, op: str): if not all( - dtype in dtypes.NUMERIC_BIGFRAMES_TYPES for dtype in self._block.dtypes + dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE + for dtype in self._block.dtypes ): raise NotImplementedError( f"'{op}' does not support non-numeric columns. " @@ -361,7 +372,9 @@ def _raise_on_non_numeric(self, op: str): def _aggregated_columns(self, numeric_only: bool = False) -> typing.Sequence[str]: valid_agg_cols: list[str] = [] for col_id in self._selected_cols: - is_numeric = self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES + is_numeric = ( + self._column_type(col_id) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE + ) if is_numeric or not numeric_only: valid_agg_cols.append(col_id) return valid_agg_cols @@ -379,10 +392,10 @@ def _aggregate_all( result_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) - return df.DataFrame(result_block) + dataframe = df.DataFrame(result_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _apply_window_op( self, diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index fc7cf167d4..6fc284403d 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -396,6 +396,10 @@ def dtypes( ) -> typing.Sequence[typing.Union[bf_dtypes.Dtype, np.dtype[typing.Any]]]: return self._block.index_dtypes + @property + def session(self) -> core.Session: + return self._expr.session + def __repr__(self) -> str: """Converts an Index to a string.""" # TODO(swast): Add a timeout here? If the query is taking a long time, @@ -411,7 +415,7 @@ def to_pandas(self) -> pandas.Index: index_columns = list(self._block.index_columns) dtypes = dict(zip(index_columns, self.dtypes)) expr = self._expr.select_columns(index_columns) - results, _ = expr.start_query() + results, _ = self.session._execute(expr) df = expr.session._rows_to_dataframe(results, dtypes) df = df.set_index(index_columns) index = df.index diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 82a869dac2..30444f5565 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -139,7 +139,7 @@ class ReadGbqNode(BigFrameNode): @property def session(self): - return (self.table_session,) + return self.table_session def __hash__(self): return self._node_hash @@ -229,6 +229,12 @@ def __hash__(self): return self._node_hash +# TODO: Merge RowCount and Corr into Aggregate Node +@dataclass(frozen=True) +class RowCountNode(UnaryNode): + pass + + @dataclass(frozen=True) class AggregateNode(UnaryNode): aggregations: typing.Tuple[typing.Tuple[str, agg_ops.AggregateOp, str], ...] diff --git a/bigframes/core/reshape/__init__.py b/bigframes/core/reshape/__init__.py index dc61c3baad..d9cc99a036 100644 --- a/bigframes/core/reshape/__init__.py +++ b/bigframes/core/reshape/__init__.py @@ -14,10 +14,13 @@ from __future__ import annotations import typing -from typing import Iterable, Literal, Optional, Union +from typing import Iterable, Literal, Optional, Tuple, Union + +import pandas as pd import bigframes.constants as constants import bigframes.core as core +import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dataframe import bigframes.operations as ops @@ -107,17 +110,29 @@ def concat( def cut( x: bigframes.series.Series, - bins: int, + bins: Union[ + int, + pd.IntervalIndex, + Iterable[Tuple[Union[int, float], Union[int, float]]], + ], *, labels: Optional[bool] = None, ) -> bigframes.series.Series: - if bins <= 0: + if isinstance(bins, int) and bins <= 0: raise ValueError("`bins` should be a positive integer.") + if isinstance(bins, Iterable): + if not isinstance(bins, pd.IntervalIndex): + bins = pd.IntervalIndex.from_tuples(list(bins)) + + if bins.is_overlapping: + raise ValueError("Overlapping IntervalIndex is not accepted.") + if labels is not False: raise NotImplementedError( f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" ) + return x._apply_window_op(agg_ops.CutOp(bins), window_spec=core.WindowSpec()) @@ -145,7 +160,10 @@ def qcut( block, result = block.apply_window_op( x._value_column, agg_ops.QcutOp(q), - window_spec=core.WindowSpec(grouping_keys=(nullity_id,)), + window_spec=core.WindowSpec( + grouping_keys=(nullity_id,), + ordering=(order.OrderingColumnReference(x._value_column),), + ), ) block, result = block.apply_binary_op( result, nullity_id, ops.partial_arg3(ops.where_op, None), result_label=label diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3b0fd7008a..595670b0b6 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -34,6 +34,7 @@ Union, ) +import google.api_core.exceptions import google.cloud.bigquery as bigquery import numpy import pandas @@ -71,7 +72,7 @@ # TODO(tbergeron): Convert to bytes-based limit MAX_INLINE_DF_SIZE = 5000 -LevelType = typing.Union[str, int] +LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] SingleItemValue = Union[bigframes.series.Series, int, float, Callable] @@ -1561,6 +1562,21 @@ def interpolate(self, method: str = "linear") -> DataFrame: def fillna(self, value=None) -> DataFrame: return self._apply_binop(value, ops.fillna_op, how="left") + def replace( + self, to_replace: typing.Any, value: typing.Any = None, *, regex: bool = False + ): + if utils.is_dict_like(value): + return self.apply( + lambda x: x.replace( + to_replace=to_replace, value=value[x.name], regex=regex + ) + if (x.name in value) + else x + ) + return self.apply( + lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) + ) + def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = bigframes.core.WindowSpec(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -1784,7 +1800,7 @@ def agg( ) -> DataFrame | bigframes.series.Series: if utils.is_list_like(func): if any( - dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES + dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE for dtype in self.dtypes ): raise NotImplementedError( @@ -1851,7 +1867,7 @@ def melt( ) def describe(self) -> DataFrame: - df_numeric = self._drop_non_numeric(keep_bool=False) + df_numeric = self._drop_non_numeric(permissive=False) if len(df_numeric.columns) == 0: raise NotImplementedError( f"df.describe() currently only supports numeric values. {constants.FEEDBACK_LINK}" @@ -1940,7 +1956,7 @@ def _stack_mono(self): def _stack_multi(self, level: LevelsType = -1): n_levels = self.columns.nlevels - if isinstance(level, int) or isinstance(level, str): + if not utils.is_list_like(level): level = [level] level_indices = [] for level_ref in level: @@ -1950,7 +1966,7 @@ def _stack_multi(self, level: LevelsType = -1): else: level_indices.append(level_ref) else: # str - level_indices.append(self.columns.names.index(level_ref)) + level_indices.append(self.columns.names.index(level_ref)) # type: ignore new_order = [ *[i for i in range(n_levels) if i not in level_indices], @@ -1966,7 +1982,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) def unstack(self, level: LevelsType = -1): - if isinstance(level, int) or isinstance(level, str): + if not utils.is_list_like(level): level = [level] block = self._block @@ -1989,10 +2005,12 @@ def unstack(self, level: LevelsType = -1): ) return DataFrame(pivot_block) - def _drop_non_numeric(self, keep_bool=True) -> DataFrame: - types_to_keep = set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES) - if not keep_bool: - types_to_keep -= set(bigframes.dtypes.BOOL_BIGFRAMES_TYPES) + def _drop_non_numeric(self, permissive=True) -> DataFrame: + types_to_keep = ( + set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) + if permissive + else set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE) + ) non_numeric_cols = [ col_id for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) @@ -2010,7 +2028,7 @@ def _drop_non_bool(self) -> DataFrame: def _raise_on_non_numeric(self, op: str): if not all( - dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES + dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE for dtype in self._block.dtypes ): raise NotImplementedError( @@ -2285,7 +2303,7 @@ def notna(self) -> DataFrame: def cumsum(self): is_numeric_types = [ - (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES) + (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) for _, dtype in self.dtypes.items() ] if not all(is_numeric_types): @@ -2297,7 +2315,7 @@ def cumsum(self): def cumprod(self) -> DataFrame: is_numeric_types = [ - (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES) + (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) for _, dtype in self.dtypes.items() ] if not all(is_numeric_types): @@ -2508,7 +2526,14 @@ def to_gbq( ) if_exists = "replace" - if "." not in destination_table: + table_parts = destination_table.split(".") + default_project = self._block.expr.session.bqclient.project + + if len(table_parts) == 2: + destination_dataset = f"{default_project}.{table_parts[0]}" + elif len(table_parts) == 3: + destination_dataset = f"{table_parts[0]}.{table_parts[1]}" + else: raise ValueError( f"Got invalid value for destination_table {repr(destination_table)}. " "Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'." @@ -2523,11 +2548,16 @@ def to_gbq( f"Valid options include None or one of {dispositions.keys()}." ) + try: + self._session.bqclient.get_dataset(destination_dataset) + except google.api_core.exceptions.NotFound: + self._session.bqclient.create_dataset(destination_dataset, exists_ok=True) + job_config = bigquery.QueryJobConfig( write_disposition=dispositions[if_exists], destination=bigquery.table.TableReference.from_string( destination_table, - default_project=self._block.expr.session.bqclient.project, + default_project=default_project, ), ) @@ -2652,6 +2682,58 @@ def to_string( encoding, ) + def to_html( + self, + buf=None, + columns: Sequence[str] | None = None, + col_space=None, + header: bool = True, + index: bool = True, + na_rep: str = "NaN", + formatters=None, + float_format=None, + sparsify: bool | None = None, + index_names: bool = True, + justify: str | None = None, + max_rows: int | None = None, + max_cols: int | None = None, + show_dimensions: bool = False, + decimal: str = ".", + bold_rows: bool = True, + classes: str | list | tuple | None = None, + escape: bool = True, + notebook: bool = False, + border: int | None = None, + table_id: str | None = None, + render_links: bool = False, + encoding: str | None = None, + ) -> str: + return self.to_pandas().to_html( + buf, + columns, # type: ignore + col_space, + header, + index, + na_rep, + formatters, + float_format, + sparsify, + index_names, + justify, # type: ignore + max_rows, + max_cols, + show_dimensions, + decimal, + bold_rows, + classes, + escape, + notebook, + border, + table_id, + render_links, + encoding, + ) + def to_markdown( self, buf=None, @@ -2677,31 +2759,34 @@ def _apply_unary_op(self, operation: ops.UnaryOp) -> DataFrame: def _create_io_query(self, index: bool, ordering_id: Optional[str]) -> str: """Create query text representing this dataframe for I/O.""" array_value = self._block.expr + + new_col_labels, new_idx_labels = utils.get_standardized_ids( + self._block.column_labels, self.index.names + ) + columns = list(self._block.value_columns) - column_labels = list(self._block.column_labels) + column_labels = new_col_labels # This code drops unnamed indexes to keep consistent with the behavior of # most pandas write APIs. The exception is `pandas.to_csv`, which keeps # unnamed indexes as `Unnamed: 0`. # TODO(chelsealin): check if works for multiple indexes. if index and self.index.name is not None: columns.extend(self._block.index_columns) - column_labels.extend(self.index.names) + column_labels.extend(new_idx_labels) else: array_value = array_value.drop_columns(self._block.index_columns) # Make columns in SQL reflect _labels_ not _ids_. Note: This may use # the arbitrary unicode column labels feature in BigQuery, which is # currently (June 2023) in preview. - # TODO(swast): Handle duplicate and NULL labels. id_overrides = { - col_id: col_label - for col_id, col_label in zip(columns, column_labels) - if col_label and isinstance(col_label, str) + col_id: col_label for col_id, col_label in zip(columns, column_labels) } if ordering_id is not None: array_value = array_value.promote_offsets(ordering_id) - return array_value.to_sql( + return self._block.session._to_sql( + array_value=array_value, col_id_overrides=id_overrides, ) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 774eb74d06..b754acea2e 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -14,6 +14,8 @@ """Mappings for Pandas dtypes supported by BigQuery DataFrames package""" +import datetime +import decimal import textwrap import typing from typing import Any, Dict, Iterable, Literal, Tuple, Union @@ -29,6 +31,7 @@ import bigframes.constants as constants import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers +import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops # Type hints for Pandas dtypes supported by BigQuery DataFrame Dtype = Union[ @@ -39,9 +42,6 @@ pd.ArrowDtype, ] -# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation) -NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()] - # On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable UNORDERED_DTYPES = [gpd.array.GeometryDtype()] @@ -56,6 +56,9 @@ "timestamp[us][pyarrow]", "date32[day][pyarrow]", "time64[us][pyarrow]", + "decimal128(38, 9)[pyarrow]", + "decimal256(38, 9)[pyarrow]", + "binary[pyarrow]", ] # Type hints for Ibis data types supported by BigQuery DataFrame @@ -71,8 +74,17 @@ BOOL_BIGFRAMES_TYPES = [pd.BooleanDtype()] -# Several operations are restricted to these types. -NUMERIC_BIGFRAMES_TYPES = [pd.BooleanDtype(), pd.Float64Dtype(), pd.Int64Dtype()] +# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation) +# Pandas is inconsistent, so two definitions are provided, each used in different contexts +NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE = [ + pd.Float64Dtype(), + pd.Int64Dtype(), +] +NUMERIC_BIGFRAMES_TYPES_PERMISSIVE = NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + [ + pd.BooleanDtype(), + pd.ArrowDtype(pa.decimal128(38, 9)), + pd.ArrowDtype(pa.decimal256(76, 38)), +] # Type hints for Ibis data types that can be read to Python objects by BigQuery DataFrame ReadOnlyIbisDtype = Union[ @@ -96,6 +108,15 @@ ibis_dtypes.Timestamp(timezone="UTC"), pd.ArrowDtype(pa.timestamp("us", tz="UTC")), ), + (ibis_dtypes.binary, pd.ArrowDtype(pa.binary())), + ( + ibis_dtypes.Decimal(precision=38, scale=9, nullable=True), + pd.ArrowDtype(pa.decimal128(38, 9)), + ), + ( + ibis_dtypes.Decimal(precision=76, scale=38, nullable=True), + pd.ArrowDtype(pa.decimal256(76, 38)), + ), ) BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = { @@ -111,6 +132,9 @@ ibis_dtypes.time: pa.time64("us"), ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"), ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"), + ibis_dtypes.binary: pa.binary(), + ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): pa.decimal128(38, 9), + ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): pa.decimal256(76, 38), } ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()} @@ -124,10 +148,6 @@ ) IBIS_TO_BIGFRAMES.update( { - ibis_dtypes.binary: np.dtype("O"), - ibis_dtypes.json: np.dtype("O"), - ibis_dtypes.Decimal(precision=38, scale=9, nullable=True): np.dtype("O"), - ibis_dtypes.Decimal(precision=76, scale=38, nullable=True): np.dtype("O"), ibis_dtypes.GeoSpatial( geotype="geography", srid=4326, nullable=True ): gpd.array.GeometryDtype(), @@ -177,7 +197,7 @@ def ibis_dtype_to_bigframes_dtype( # our IO returns them as objects. Eventually, we should support them as # ArrowDType (and update the IO accordingly) if isinstance(ibis_dtype, ibis_dtypes.Array): - return np.dtype("O") + return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype)) if isinstance(ibis_dtype, ibis_dtypes.Struct): return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype)) @@ -199,7 +219,9 @@ def ibis_dtype_to_bigframes_dtype( def ibis_dtype_to_arrow_dtype(ibis_dtype: ibis_dtypes.DataType) -> pa.DataType: if isinstance(ibis_dtype, ibis_dtypes.Array): - return pa.list_(ibis_dtype_to_arrow_dtype(ibis_dtype.value_type)) + return pa.list_( + ibis_dtype_to_arrow_dtype(ibis_dtype.value_type.copy(nullable=True)) + ) if isinstance(ibis_dtype, ibis_dtypes.Struct): return pa.struct( @@ -223,21 +245,13 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value: This is useful in cases where multiple types correspond to the same BigFrames dtype. """ ibis_type = value.type() + name = value.get_name() + if ibis_type.is_json(): + value = vendored_ibis_ops.ToJsonString(value).to_expr() + return value.name(name) # Allow REQUIRED fields to be joined with NULLABLE fields. nullable_type = ibis_type.copy(nullable=True) - return value.cast(nullable_type).name(value.get_name()) - - -def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table: - """Converts an Ibis table expression to canonical types. - - This is useful in cases where multiple types correspond to the same BigFrames dtype. - """ - casted_columns = [] - for column_name in table.columns: - column = typing.cast(ibis_types.Value, table[column_name]) - casted_columns.append(ibis_value_to_canonical_type(column)) - return table.select(*casted_columns) + return value.cast(nullable_type).name(name) def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType: @@ -287,7 +301,7 @@ def bigframes_dtype_to_ibis_dtype( f""" Unexpected data type {bigframes_dtype}. The following str dtypes are supppted: 'boolean','Float64','Int64', 'string', - 'tring[pyarrow]','timestamp[us, tz=UTC][pyarrow]', + 'string[pyarrow]','timestamp[us, tz=UTC][pyarrow]', 'timestamp[us][pyarrow]','date32[day][pyarrow]', 'time64[us][pyarrow]'. The following pandas.ExtensionDtype are supported: pandas.BooleanDtype(), pandas.Float64Dtype(), @@ -385,15 +399,35 @@ def cast_ibis_value( ibis_dtypes.bool, ibis_dtypes.float64, ibis_dtypes.string, + ibis_dtypes.Decimal(precision=38, scale=9), + ibis_dtypes.Decimal(precision=76, scale=38), + ), + ibis_dtypes.float64: ( + ibis_dtypes.string, + ibis_dtypes.int64, + ibis_dtypes.Decimal(precision=38, scale=9), + ibis_dtypes.Decimal(precision=76, scale=38), + ), + ibis_dtypes.string: ( + ibis_dtypes.int64, + ibis_dtypes.float64, + ibis_dtypes.Decimal(precision=38, scale=9), + ibis_dtypes.Decimal(precision=76, scale=38), + ibis_dtypes.binary, ), - ibis_dtypes.float64: (ibis_dtypes.string, ibis_dtypes.int64), - ibis_dtypes.string: (ibis_dtypes.int64, ibis_dtypes.float64), ibis_dtypes.date: (ibis_dtypes.string,), - ibis_dtypes.Decimal(precision=38, scale=9): (ibis_dtypes.float64,), - ibis_dtypes.Decimal(precision=76, scale=38): (ibis_dtypes.float64,), + ibis_dtypes.Decimal(precision=38, scale=9): ( + ibis_dtypes.float64, + ibis_dtypes.Decimal(precision=76, scale=38), + ), + ibis_dtypes.Decimal(precision=76, scale=38): ( + ibis_dtypes.float64, + ibis_dtypes.Decimal(precision=38, scale=9), + ), ibis_dtypes.time: (), ibis_dtypes.timestamp: (ibis_dtypes.Timestamp(timezone="UTC"),), ibis_dtypes.Timestamp(timezone="UTC"): (ibis_dtypes.timestamp,), + ibis_dtypes.binary: (ibis_dtypes.string,), } value = ibis_value_to_canonical_type(value) @@ -437,3 +471,82 @@ def to_pandas_dtypes_overrides(schema: Iterable[bigquery.SchemaField]) -> Dict: gcb3p_pandas_helpers.bq_to_arrow_data_type(field) ) return dtypes + + +def is_dtype(scalar: typing.Any, dtype: Dtype) -> bool: + """Captures whether a scalar can be losslessly represented by a dtype.""" + if scalar is None: + return True + if pd.api.types.is_bool_dtype(dtype): + return pd.api.types.is_bool(scalar) + if pd.api.types.is_float_dtype(dtype): + return pd.api.types.is_float(scalar) + if pd.api.types.is_integer_dtype(dtype): + return pd.api.types.is_integer(scalar) + if isinstance(dtype, pd.StringDtype): + return isinstance(scalar, str) + if isinstance(dtype, pd.ArrowDtype): + pa_type = dtype.pyarrow_dtype + return is_patype(scalar, pa_type) + return False + + +# string is binary +def is_patype(scalar: typing.Any, pa_type: pa.DataType) -> bool: + """Determine whether a scalar's type matches a given pyarrow type.""" + if pa_type == pa.time64("us"): + return isinstance(scalar, datetime.time) + elif pa_type == pa.timestamp("us"): + if isinstance(scalar, datetime.datetime): + return not scalar.tzinfo + if isinstance(scalar, pd.Timestamp): + return not scalar.tzinfo + elif pa_type == pa.timestamp("us", tz="UTC"): + if isinstance(scalar, datetime.datetime): + return scalar.tzinfo == datetime.timezone.utc + if isinstance(scalar, pd.Timestamp): + return scalar.tzinfo == datetime.timezone.utc + elif pa_type == pa.date32(): + return isinstance(scalar, datetime.date) + elif pa_type == pa.binary(): + return isinstance(scalar, bytes) + elif pa_type == pa.decimal128(38, 9): + # decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks + return isinstance(scalar, decimal.Decimal) + elif pa_type == pa.decimal256(76, 38): + # decimal.Decimal is a superset, but ibis performs out-of-bounds and loss-of-precision checks + return isinstance(scalar, decimal.Decimal) + return False + + +def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]: + """Whether scalar can be compare to items of dtype (though maybe requiring coercion). Returns the datatype that must be used for the comparison""" + if is_dtype(scalar, dtype): + return dtype + elif pd.api.types.is_numeric_dtype(dtype): + # Implicit conversion currently only supported for numeric types + if pd.api.types.is_bool(scalar): + return lcd_type(pd.BooleanDtype(), dtype) + if pd.api.types.is_float(scalar): + return lcd_type(pd.Float64Dtype(), dtype) + if pd.api.types.is_integer(scalar): + return lcd_type(pd.Int64Dtype(), dtype) + if isinstance(scalar, decimal.Decimal): + # TODO: Check context to see if can use NUMERIC instead of BIGNUMERIC + return lcd_type(pd.ArrowDtype(pa.decimal128(76, 38)), dtype) + return None + + +def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]: + # Implicit conversion currently only supported for numeric types + hierarchy: list[Dtype] = [ + pd.BooleanDtype(), + pd.Int64Dtype(), + pd.Float64Dtype(), + pd.ArrowDtype(pa.decimal128(38, 9)), + pd.ArrowDtype(pa.decimal256(76, 38)), + ] + if (dtype1 not in hierarchy) or (dtype2 not in hierarchy): + return None + lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2)) + return hierarchy[lcd_index] diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index a29dd36c72..678774978a 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -18,6 +18,7 @@ import typing import ibis +import ibis.common.annotations import ibis.common.exceptions import ibis.expr.datatypes as ibis_dtypes import ibis.expr.operations.generic @@ -352,14 +353,23 @@ def _as_ibis(self, x: ibis_types.Value): str_val = typing.cast(ibis_types.StringValue, x) # SQL pad operations will truncate, we do not want to truncate though. - pad_length = ibis.greatest(str_val.length(), self._length) + pad_length = typing.cast( + ibis_types.IntegerValue, ibis.greatest(str_val.length(), self._length) + ) if self._side == "left": return str_val.lpad(pad_length, self._fillchar) elif self._side == "right": return str_val.rpad(pad_length, self._fillchar) else: # side == both # Pad more on right side if can't pad both sides equally - lpad_amount = ((pad_length - str_val.length()) // 2) + str_val.length() + lpad_amount = typing.cast( + ibis_types.IntegerValue, + ( + (pad_length - str_val.length()) + // typing.cast(ibis_types.NumericValue, ibis.literal(2)) + ) + + str_val.length(), + ) return str_val.lpad(lpad_amount, self._fillchar).rpad( pad_length, self._fillchar ) @@ -375,7 +385,7 @@ def _as_ibis(self, x: ibis_types.Value): ibis_types.StringValue, ibis_types.literal(self._pat) ) repl_str_value = typing.cast( - ibis_types.StringValue, ibis_types.literal(self._pat) + ibis_types.StringValue, ibis_types.literal(self._repl) ) return typing.cast(ibis_types.StringValue, x).replace( @@ -513,6 +523,20 @@ def _as_ibis(self, x: ibis_types.Value): return bigframes.dtypes.cast_ibis_value(x, self.to_type) +class MapOp(UnaryOp): + def __init__( + self, + mappings: typing.Tuple[typing.Tuple[typing.Hashable, typing.Hashable], ...], + ): + self._mappings = mappings + + def _as_ibis(self, x: ibis_types.Value): + case = ibis.case() + for mapping in self._mappings: + case = case.when(x == mapping[0], mapping[1]) + return case.else_(x).end() + + class FindOp(UnaryOp): def __init__(self, sub, start, end): self._sub = sub @@ -722,10 +746,29 @@ def ne_op( return x != y +def _null_or_value(value: ibis_types.Value, where_value: ibis_types.BooleanValue): + return ibis.where( + where_value, + value, + ibis.null(), + ) + + def and_op( x: ibis_types.Value, y: ibis_types.Value, ): + # Workaround issue https://github.com/ibis-project/ibis/issues/7775 by + # implementing three-valued logic ourselves. For AND, when we encounter a + # NULL value, we only know when the result is FALSE, otherwise the result + # is unknown (NULL). See: truth table at + # https://en.wikibooks.org/wiki/Structured_Query_Language/NULLs_and_the_Three_Valued_Logic#AND,_OR + if isinstance(x, ibis_types.NullScalar): + return _null_or_value(y, y == ibis.literal(False)) + + if isinstance(y, ibis_types.NullScalar): + return _null_or_value(x, x == ibis.literal(False)) + return typing.cast(ibis_types.BooleanValue, x) & typing.cast( ibis_types.BooleanValue, y ) @@ -735,6 +778,17 @@ def or_op( x: ibis_types.Value, y: ibis_types.Value, ): + # Workaround issue https://github.com/ibis-project/ibis/issues/7775 by + # implementing three-valued logic ourselves. For OR, when we encounter a + # NULL value, we only know when the result is TRUE, otherwise the result + # is unknown (NULL). See: truth table at + # https://en.wikibooks.org/wiki/Structured_Query_Language/NULLs_and_the_Three_Valued_Logic#AND,_OR + if isinstance(x, ibis_types.NullScalar): + return _null_or_value(y, y == ibis.literal(True)) + + if isinstance(y, ibis_types.NullScalar): + return _null_or_value(x, x == ibis.literal(True)) + return typing.cast(ibis_types.BooleanValue, x) | typing.cast( ibis_types.BooleanValue, y ) @@ -746,10 +800,16 @@ def add_op( y: ibis_types.Value, ): if isinstance(x, ibis_types.NullScalar) or isinstance(x, ibis_types.NullScalar): - return - return typing.cast(ibis_types.NumericValue, x) + typing.cast( - ibis_types.NumericValue, y - ) + return ibis.null() + try: + # Could be string concatenation or numeric addition. + return x + y # type: ignore + except ibis.common.annotations.SignatureValidationError as exc: + left_type = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(x.type()) + right_type = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(y.type()) + raise TypeError( + f"Cannot add {repr(left_type)} and {repr(right_type)}. {constants.FEEDBACK_LINK}" + ) from exc @short_circuit_nulls() @@ -1047,7 +1107,7 @@ def where_op( replacement: ibis_types.Value, ) -> ibis_types.Value: """Returns x if y is true, otherwise returns z.""" - return ibis.case().when(condition, original).else_(replacement).end() + return ibis.case().when(condition, original).else_(replacement).end() # type: ignore def clip_op( @@ -1060,7 +1120,7 @@ def clip_op( not isinstance(upper, ibis_types.NullScalar) ): return ( - ibis.case() + ibis.case() # type: ignore .when(upper.isnull() | (original > upper), upper) .else_(original) .end() @@ -1069,7 +1129,7 @@ def clip_op( upper, ibis_types.NullScalar ): return ( - ibis.case() + ibis.case() # type: ignore .when(lower.isnull() | (original < lower), lower) .else_(original) .end() @@ -1079,9 +1139,11 @@ def clip_op( ): return original else: - # Note: Pandas has unchanged behavior when upper bound and lower bound are flipped. This implementation requires that lower_bound < upper_bound + # Note: Pandas has unchanged behavior when upper bound and lower bound + # are flipped. + # This implementation requires that lower_bound < upper_bound. return ( - ibis.case() + ibis.case() # type: ignore .when(lower.isnull() | (original < lower), lower) .when(upper.isnull() | (original > upper), upper) .else_(original) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 465d188724..8178ebfaea 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -20,6 +20,7 @@ import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types from pandas import Int64Dtype +import pandas as pd import bigframes.constants as constants import bigframes.dtypes as dtypes @@ -74,7 +75,7 @@ def _as_ibis( # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) return ( - ibis.case().when(bq_sum.isnull(), ibis_types.literal(0)).else_(bq_sum).end() + ibis.case().when(bq_sum.isnull(), ibis_types.literal(0)).else_(bq_sum).end() # type: ignore ) @@ -167,7 +168,7 @@ def _as_ibis( .else_(magnitude * pow(-1, negative_count_parity)) .end() ) - return float_result.cast(column.type()) + return float_result.cast(column.type()) # type: ignore class MaxOp(AggregateOp): @@ -228,21 +229,37 @@ def skips_nulls(self): class CutOp(WindowOp): - def __init__(self, bins: int): - self._bins_ibis = dtypes.literal_to_ibis_scalar(bins, force_dtype=Int64Dtype()) - self._bins_int = bins + def __init__(self, bins: typing.Union[int, pd.IntervalIndex]): + if isinstance(bins, int): + if not bins > 0: + raise ValueError("`bins` should be a positive integer.") + self._bins_int = bins + self._bins = dtypes.literal_to_ibis_scalar(bins, force_dtype=Int64Dtype()) + else: + self._bins_int = 0 + self._bins = bins def _as_ibis(self, x: ibis_types.Column, window=None): - col_min = _apply_window_if_present(x.min(), window) - col_max = _apply_window_if_present(x.max(), window) - bin_width = (col_max - col_min) / self._bins_ibis out = ibis.case() - for this_bin in range(self._bins_int - 1): - out = out.when( - x <= (col_min + (this_bin + 1) * bin_width), - dtypes.literal_to_ibis_scalar(this_bin, force_dtype=Int64Dtype()), - ) - out = out.when(x.notnull(), self._bins_ibis - 1) + + if self._bins_int > 0: + col_min = _apply_window_if_present(x.min(), window) + col_max = _apply_window_if_present(x.max(), window) + bin_width = (col_max - col_min) / self._bins + + for this_bin in range(self._bins_int - 1): + out = out.when( + x <= (col_min + (this_bin + 1) * bin_width), + dtypes.literal_to_ibis_scalar(this_bin, force_dtype=Int64Dtype()), + ) + out = out.when(x.notnull(), self._bins - 1) + else: + for interval in self._bins: + condition = (x > interval.left) & (x <= interval.right) + interval_struct = ibis.struct( + {"left_exclusive": interval.left, "right_inclusive": interval.right} + ) + out = out.when(condition, interval_struct) return out.end() @property @@ -290,7 +307,7 @@ def _as_ibis( dtypes.literal_to_ibis_scalar(bucket_n, force_dtype=Int64Dtype()), ) out = out.else_(None) - return out.end() + return out.end() # type: ignore @property def skips_nulls(self): @@ -482,7 +499,7 @@ def _map_to_literal( original: ibis_types.Value, literal: ibis_types.Scalar ) -> ibis_types.Column: # Hack required to perform aggregations on literals in ibis, even though bigquery will let you directly aggregate literals (eg. 'SELECT COUNT(1) from table1') - return ibis.ifelse(original.isnull(), literal, literal) + return ibis.ifelse(original.isnull(), literal, literal) # type: ignore sum_op = SumOp() diff --git a/bigframes/remote_function.py b/bigframes/remote_function.py index a899ebd371..f54c26fa56 100644 --- a/bigframes/remote_function.py +++ b/bigframes/remote_function.py @@ -535,17 +535,14 @@ def remote_function_node( """Creates an Ibis node representing a remote function call.""" fields = { - name: rlz.value(type_) if type_ else rlz.any + name: rlz.ValueOf(None if type_ == "ANY TYPE" else type_) for name, type_ in zip( ibis_signature.parameter_names, ibis_signature.input_types ) } - try: - fields["output_type"] = rlz.shape_like("args", dtype=ibis_signature.output_type) # type: ignore - except TypeError: - fields["output_dtype"] = property(lambda _: ibis_signature.output_type) - fields["output_shape"] = rlz.shape_like("args") + fields["dtype"] = ibis_signature.output_type # type: ignore + fields["shape"] = rlz.shape_like("args") node = type(routine_ref_to_string_for_query(routine_ref), (ops.ValueOp,), fields) # type: ignore diff --git a/bigframes/series.py b/bigframes/series.py index c929775a00..eefd2b755d 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -16,6 +16,7 @@ from __future__ import annotations +import functools import itertools import numbers import textwrap @@ -442,42 +443,82 @@ def replace( self, to_replace: typing.Any, value: typing.Any = None, *, regex: bool = False ): if regex: - if not (isinstance(to_replace, str) and isinstance(value, str)): - raise NotImplementedError( - f"replace regex mode only supports strings for 'to_replace' and 'value'. {constants.FEEDBACK_LINK}" - ) - block, result_col = self._block.apply_unary_op( - self._value_column, - ops.ReplaceRegexOp(to_replace, value), - result_label=self.name, - ) - return Series(block.select_column(result_col)) + # No-op unless to_replace and series dtype are both string type + if not isinstance(to_replace, str) or not isinstance( + self.dtype, pandas.StringDtype + ): + return self + return self._regex_replace(to_replace, value) elif utils.is_dict_like(to_replace): - raise NotImplementedError( - f"Dict 'to_replace' not supported. {constants.FEEDBACK_LINK}" - ) + return self._mapping_replace(to_replace) # type: ignore elif utils.is_list_like(to_replace): - block, cond = self._block.apply_unary_op( - self._value_column, ops.IsInOp(to_replace) - ) - block, result_col = block.apply_binary_op( - cond, - self._value_column, - ops.partial_arg1(ops.where_op, value), - result_label=self.name, - ) - return Series(block.select_column(result_col)) + replace_list = to_replace else: # Scalar - block, cond = self._block.apply_unary_op( - self._value_column, ops.BinopPartialLeft(ops.eq_op, to_replace) + replace_list = [to_replace] + replace_list = [ + i for i in replace_list if bigframes.dtypes.is_compatible(i, self.dtype) + ] + return self._simple_replace(replace_list, value) if replace_list else self + + def _regex_replace(self, to_replace: str, value: str): + if not bigframes.dtypes.is_dtype(value, self.dtype): + raise NotImplementedError( + f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) - block, result_col = block.apply_binary_op( - cond, - self._value_column, - ops.partial_arg1(ops.where_op, value), - result_label=self.name, + block, result_col = self._block.apply_unary_op( + self._value_column, + ops.ReplaceRegexOp(to_replace, value), + result_label=self.name, + ) + return Series(block.select_column(result_col)) + + def _simple_replace(self, to_replace_list: typing.Sequence, value): + result_type = bigframes.dtypes.is_compatible(value, self.dtype) + if not result_type: + raise NotImplementedError( + f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" ) - return Series(block.select_column(result_col)) + + if result_type != self.dtype: + return self.astype(result_type)._simple_replace(to_replace_list, value) + + block, cond = self._block.apply_unary_op( + self._value_column, ops.IsInOp(to_replace_list) + ) + block, result_col = block.apply_binary_op( + cond, + self._value_column, + ops.partial_arg1(ops.where_op, value), + result_label=self.name, + ) + return Series(block.select_column(result_col)) + + def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): + tuples = [] + lcd_types: list[typing.Optional[bigframes.dtypes.Dtype]] = [] + for key, value in mapping.items(): + lcd_type = bigframes.dtypes.is_compatible(key, self.dtype) + if not lcd_type: + continue + if not bigframes.dtypes.is_dtype(value, self.dtype): + raise NotImplementedError( + f"Cannot replace {self.dtype} elements with incompatible item {value} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" + ) + tuples.append((key, value)) + lcd_types.append(lcd_type) + + result_dtype = functools.reduce( + lambda t1, t2: bigframes.dtypes.lcd_type(t1, t2) if (t1 and t2) else None, + lcd_types, + ) + if not result_dtype: + raise NotImplementedError( + f"Cannot replace {self.dtype} elements with incompatible mapping {mapping} as mixed-type columns not supported. {constants.FEEDBACK_LINK}" + ) + block, result = self._block.apply_unary_op( + self._value_column, ops.MapOp(tuple(tuples)) + ) + return Series(block.select_column(result)) def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -757,7 +798,7 @@ def _central_moment(self, n: int) -> float: def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: if _is_list_like(func): - if self.dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES: + if self.dtype not in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: raise NotImplementedError( f"Multiple aggregations only supported on numeric series. {constants.FEEDBACK_LINK}" ) @@ -816,7 +857,6 @@ def mode(self) -> Series: block, agg_ids = block.aggregate( by_column_ids=[self._value_column], aggregations=((self._value_column, agg_ops.count_op),), - as_index=False, ) value_count_col_id = agg_ids[0] block, max_value_count_col_id = block.apply_window_op( @@ -830,14 +870,15 @@ def mode(self) -> Series: ops.eq_op, ) block = block.filter(is_mode_col_id) - mode_values_series = Series( - block.select_column(self._value_column).assign_label( - self._value_column, self.name - ) - ) - return typing.cast( - Series, mode_values_series.sort_values().reset_index(drop=True) + # use temporary name for reset_index to avoid collision, restore after dropping extra columns + block = ( + block.with_index_labels(["mode_temp_internal"]) + .order_by([OrderingColumnReference(self._value_column)]) + .reset_index(drop=False) ) + block = block.select_column(self._value_column).with_column_labels([self.name]) + mode_values_series = Series(block.select_column(self._value_column)) + return typing.cast(Series, mode_values_series) def mean(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.mean_op)) @@ -1324,7 +1365,7 @@ def to_csv(self, path_or_buf=None, **kwargs) -> typing.Optional[str]: return self.to_pandas().to_csv(path_or_buf, **kwargs) def to_dict(self, into: type[dict] = dict) -> typing.Mapping: - return typing.cast(dict, self.to_pandas().to_dict(into)) + return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: return self.to_pandas().to_excel(excel_writer, sheet_name, **kwargs) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5364060d1c..fbe900106a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -67,6 +67,7 @@ from bigframes.core import log_adapter import bigframes.core as core import bigframes.core.blocks as blocks +import bigframes.core.compile import bigframes.core.guid as guid from bigframes.core.ordering import IntegerEncoding, OrderingColumnReference import bigframes.core.ordering as orderings @@ -79,9 +80,9 @@ import bigframes.session.clients import bigframes.version -# Even though the ibis.backends.bigquery.registry import is unused, it's needed +# Even though the ibis.backends.bigquery import is unused, it's needed # to register new and replacement ops with the Ibis BigQuery backend. -import third_party.bigframes_vendored.ibis.backends.bigquery.registry # noqa +import third_party.bigframes_vendored.ibis.backends.bigquery # noqa import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops import third_party.bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import third_party.bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet @@ -873,8 +874,9 @@ def _read_pandas( total_ordering_columns=frozenset([ordering_col]), integer_encoding=IntegerEncoding(True, is_sequential=True), ) - table_expression = self.ibis_client.table( + table_expression = self.ibis_client.table( # type: ignore load_table_destination.table_id, + # TODO: use "dataset_id" as the "schema" database=f"{load_table_destination.project}.{load_table_destination.dataset_id}", ) @@ -1015,13 +1017,13 @@ def read_csv( header=header, names=names, index_col=index_col, - usecols=usecols, + usecols=usecols, # type: ignore dtype=dtype, engine=engine, encoding=encoding, **kwargs, ) - return self.read_pandas(pandas_df) + return self.read_pandas(pandas_df) # type: ignore def read_pickle( self, @@ -1436,6 +1438,81 @@ def _start_query( results_iterator = query_job.result(max_results=max_results) return results_iterator, query_job + def _execute_and_cache( + self, array_value: core.ArrayValue, cluster_cols: typing.Sequence[str] + ) -> core.ArrayValue: + """Executes the query and uses the resulting table to rewrite future executions.""" + # TODO: Use this for all executions? Problem is that caching materializes extra + # ordering columns + compiled_value = self._compile_ordered(array_value) + + ibis_expr = compiled_value._to_ibis_expr( + ordering_mode="unordered", expose_hidden_cols=True + ) + tmp_table = self._ibis_to_temp_table( + ibis_expr, cluster_cols=cluster_cols, api_name="cached" + ) + table_expression = self.ibis_client.table( + f"{tmp_table.project}.{tmp_table.dataset_id}.{tmp_table.table_id}" + ) + new_columns = [table_expression[column] for column in compiled_value.column_ids] + new_hidden_columns = [ + table_expression[column] + for column in compiled_value._hidden_ordering_column_names + ] + # TODO: Instead, keep session-wide map of cached results and automatically reuse + return core.ArrayValue.from_ibis( + self, + table_expression, + columns=new_columns, + hidden_ordering_columns=new_hidden_columns, + ordering=compiled_value._ordering, + ) + + def _execute( + self, + array_value: core.ArrayValue, + job_config: Optional[bigquery.job.QueryJobConfig] = None, + max_results: Optional[int] = None, + *, + sorted: bool = True, + dry_run=False, + ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + sql = self._to_sql(array_value, sorted=sorted) # type:ignore + job_config = bigquery.QueryJobConfig(dry_run=dry_run) + return self._start_query( + sql=sql, + job_config=job_config, + max_results=max_results, + ) + + def _to_sql( + self, + array_value: core.ArrayValue, + offset_column: typing.Optional[str] = None, + col_id_overrides: typing.Mapping[str, str] = {}, + sorted: bool = False, + ) -> str: + if offset_column: + array_value = array_value.promote_offsets(offset_column) + if sorted: + return self._compile_ordered(array_value).to_sql( + col_id_overrides=col_id_overrides, sorted=True + ) + return self._compile_unordered(array_value).to_sql( + col_id_overrides=col_id_overrides + ) + + def _compile_ordered( + self, array_value: core.ArrayValue + ) -> bigframes.core.compile.OrderedIR: + return bigframes.core.compile.compile_ordered(array_value.node) + + def _compile_unordered( + self, array_value: core.ArrayValue + ) -> bigframes.core.compile.UnorderedIR: + return bigframes.core.compile.compile_unordered(array_value.node) + def _get_table_size(self, destination_table): table = self.bqclient.get_table(destination_table) return table.num_bytes diff --git a/bigframes/version.py b/bigframes/version.py index 04eac385f6..494335acd7 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.17.0" +__version__ = "0.18.0" diff --git a/docs/samples b/docs/samples deleted file mode 120000 index e804737ed3..0000000000 --- a/docs/samples +++ /dev/null @@ -1 +0,0 @@ -../samples \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index 901394813a..3809f8e241 100644 --- a/mypy.ini +++ b/mypy.ini @@ -24,5 +24,8 @@ ignore_missing_imports = True [mypy-pyarrow] ignore_missing_imports = True +[mypy-ibis.*] +ignore_missing_imports = True + [mypy-ipywidgets] ignore_missing_imports = True diff --git a/noxfile.py b/noxfile.py index 2174e27529..1d3624005a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -518,29 +518,29 @@ def prerelease(session: nox.sessions.Session, tests_path): "--prefer-binary", "--pre", "--upgrade", - # TODO(shobs): Remove tying to version 2.1.3 after - # https://github.com/pandas-dev/pandas/issues/56463 is resolved - "pandas!=2.1.4", + # TODO(shobs): Remove excluding version 2.1.4 after + # https://github.com/pandas-dev/pandas/issues/56463 is resolved. + # + # TODO(shobs): Remove excluding version 2.2.0rc0 after + # https://github.com/pandas-dev/pandas/issues/56646 and + # https://github.com/pandas-dev/pandas/issues/56651 are resolved. + "pandas!=2.1.4,!=2.2.0rc0", ) already_installed.add("pandas") - # TODO(shobs): - # Commit https://github.com/ibis-project/ibis/commit/c20ba7feab6bdea6c299721310e04dbc10551cc2 - # introduced breaking change that removed the following: - # ibis.expr.rules.column - # ibis.expr.rules.value - # ibis.expr.rules.any - # Let's exclude ibis head from prerelease install list for now. Instead, use - # a working ibis-framework version resolved via setup.by (currently resolves - # to version 6.2.0 due to version requirement "6.2.0,<7.0.0dev"). - # We should enable the head back once bigframes support a version that - # includes the above commit. + # Ibis has introduced breaking changes. Let's exclude ibis head + # from prerelease install list for now. We should enable the head back + # once bigframes supports the version at HEAD. # session.install( - # "--upgrade", - # "-e", # Use -e so that py.typed file is included. - # "git+https://github.com/ibis-project/ibis.git#egg=ibis-framework", + # "--upgrade", + # "-e", # Use -e so that py.typed file is included. + # "git+https://github.com/ibis-project/ibis.git@7.x.x#egg=ibis-framework", # ) - session.install("--no-deps", "ibis-framework==6.2.0") + session.install( + "--upgrade", + # "--pre", + "ibis-framework>=7.1.0,<7.2.0dev", + ) already_installed.add("ibis-framework") # Workaround https://github.com/googleapis/python-db-dtypes-pandas/issues/178 diff --git a/samples/snippets/explore_query_result_test.py b/samples/snippets/explore_query_result_test.py new file mode 100644 index 0000000000..5f0ec7d9b6 --- /dev/null +++ b/samples/snippets/explore_query_result_test.py @@ -0,0 +1,70 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_bigquery_dataframes_explore_query_result(): + import bigframes.pandas as bpd + + # [START bigquery_dataframes_explore_query_result] + # Load data from BigQuery + query_or_table = "bigquery-public-data.ml_datasets.penguins" + bq_df = bpd.read_gbq(query_or_table) + + # Inspect one of the columns (or series) of the DataFrame: + bq_df["body_mass_g"] + + # Compute the mean of this series: + average_body_mass = bq_df["body_mass_g"].mean() + print(f"average_body_mass: {average_body_mass}") + + # Find the heaviest species using the groupby operation to calculate the + # mean body_mass_g: + ( + bq_df["body_mass_g"] + .groupby(by=bq_df["species"]) + .mean() + .sort_values(ascending=False) + .head(10) + ) + + # Create the Linear Regression model + from bigframes.ml.linear_model import LinearRegression + + # Filter down to the data we want to analyze + adelie_data = bq_df[bq_df.species == "Adelie Penguin (Pygoscelis adeliae)"] + + # Drop the columns we don't care about + adelie_data = adelie_data.drop(columns=["species"]) + + # Drop rows with nulls to get our training data + training_data = adelie_data.dropna() + + # Pick feature columns and label column + X = training_data[ + [ + "island", + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "sex", + ] + ] + y = training_data[["body_mass_g"]] + + model = LinearRegression(fit_intercept=False) + model.fit(X, y) + model.score(X, y) + # [END bigquery_dataframes_explore_query_result] + assert average_body_mass is not None + assert model is not None diff --git a/setup.py b/setup.py index 3351542985..345d1ea752 100644 --- a/setup.py +++ b/setup.py @@ -43,8 +43,9 @@ "google-cloud-iam >=2.12.1", "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", + # TODO: Relax upper bound once we have fixed unit tests with 7.2.0. + "ibis-framework[bigquery] >=7.1.0,<7.2.0dev", # TODO: Relax upper bound once we have fixed `system_prerelease` tests. - "ibis-framework[bigquery] >=6.2.0,<7.0.0dev", "pandas >=1.5.0,<2.1.4", "pydata-google-auth >=1.8.2", "requests >=2.27.1", @@ -81,7 +82,7 @@ # benchmarks, etc. packages = [ package - for package in setuptools.PEP420PackageFinder.find() + for package in setuptools.find_namespace_packages() if package.startswith("bigframes") or package.startswith("third_party") ] diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index f43d3b4ca0..218255c77e 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -45,7 +45,7 @@ greenlet==2.0.2 grpc-google-iam-v1==0.12.6 grpcio==1.53.0 grpcio-status==1.48.2 -ibis-framework==6.2.0 +ibis-framework==7.1.0 humanize==4.6.0 identify==2.5.22 idna==3.4 @@ -107,7 +107,7 @@ scikit-learn==1.2.2 SecretStorage==3.3.3 six==1.16.0 SQLAlchemy==1.4.0 -sqlglot==10.6.4 +sqlglot==18.12.0 tomli==2.0.1 toolz==0.12.0 tqdm==4.65.0 diff --git a/tests/system/large/ml/test_compose.py b/tests/system/large/ml/test_compose.py index 0c280e5d02..6ea4f72489 100644 --- a/tests/system/large/ml/test_compose.py +++ b/tests/system/large/ml/test_compose.py @@ -72,7 +72,7 @@ def test_columntransformer_standalone_fit_and_transform( expected.standard_scaled_flipper_length_mm.astype("Float64") ) - pandas.testing.assert_frame_equal(result, expected, rtol=1e-3) + pandas.testing.assert_frame_equal(result, expected, rtol=1e-3, check_dtype=False) def test_columntransformer_standalone_fit_transform(new_penguins_df): @@ -123,4 +123,4 @@ def test_columntransformer_standalone_fit_transform(new_penguins_df): expected.standard_scaled_flipper_length_mm.astype("Float64") ) - pandas.testing.assert_frame_equal(result, expected, rtol=1e-3) + pandas.testing.assert_frame_equal(result, expected, rtol=1e-3, check_dtype=False) diff --git a/tests/system/large/ml/test_core.py b/tests/system/large/ml/test_core.py index 3b30d7eb1d..df387e6ee1 100644 --- a/tests/system/large/ml/test_core.py +++ b/tests/system/large/ml/test_core.py @@ -184,4 +184,5 @@ def test_bqml_standalone_transform(penguins_df_default_index, new_penguins_df): expected, check_exact=False, rtol=0.1, + check_dtype=False, ) diff --git a/tests/system/small/ml/test_core.py b/tests/system/small/ml/test_core.py index eece5ef21d..f39815aec2 100644 --- a/tests/system/small/ml/test_core.py +++ b/tests/system/small/ml/test_core.py @@ -292,11 +292,12 @@ def test_model_predict_with_unnamed_index( def test_remote_model_predict( bqml_linear_remote_model: core.BqmlModel, new_penguins_df ): - predictions = bqml_linear_remote_model.predict(new_penguins_df).to_pandas() expected = pd.DataFrame( {"predicted_body_mass_g": [[3739.54], [3675.79], [3619.54]]}, index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + dtype=pd.ArrowDtype(pa.list_(pa.float64())), ) + predictions = bqml_linear_remote_model.predict(new_penguins_df).to_pandas() pd.testing.assert_frame_equal( predictions[["predicted_body_mass_g"]].sort_index(), expected, diff --git a/tests/system/small/ml/test_imported.py b/tests/system/small/ml/test_imported.py index 9008e85a0b..8ffd9924e9 100644 --- a/tests/system/small/ml/test_imported.py +++ b/tests/system/small/ml/test_imported.py @@ -51,6 +51,7 @@ def test_tensorflow_model_predict(imported_tensorflow_model, llm_text_df): result, expected, check_exact=False, + check_dtype=False, atol=0.1, ) @@ -90,6 +91,7 @@ def test_onnx_model_predict(imported_onnx_model, onnx_iris_df): result, expected, check_exact=False, + check_dtype=False, atol=0.1, ) diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index 267a2ed9c1..fd1b803eea 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import numpy as np import pytest from bigframes.ml import llm @@ -202,8 +201,7 @@ def test_embedding_generator_predict_success( assert "text_embedding" in df.columns series = df["text_embedding"] value = series[0] - assert isinstance(value, np.ndarray) - assert value.size == 768 + assert len(value) == 768 @pytest.mark.flaky(retries=2, delay=120) @@ -215,8 +213,7 @@ def test_embedding_generator_multilingual_predict_success( assert "text_embedding" in df.columns series = df["text_embedding"] value = series[0] - assert isinstance(value, np.ndarray) - assert value.size == 768 + assert len(value) == 768 @pytest.mark.flaky(retries=2, delay=120) @@ -228,5 +225,4 @@ def test_embedding_generator_predict_series_success( assert "text_embedding" in df.columns series = df["text_embedding"] value = series[0] - assert isinstance(value, np.ndarray) - assert value.size == 768 + assert len(value) == 768 diff --git a/tests/system/small/ml/test_preprocessing.py b/tests/system/small/ml/test_preprocessing.py index 45548acca3..c3bd7f3b87 100644 --- a/tests/system/small/ml/test_preprocessing.py +++ b/tests/system/small/ml/test_preprocessing.py @@ -15,6 +15,7 @@ import math import pandas as pd +import pyarrow as pa import bigframes.ml.preprocessing @@ -453,6 +454,9 @@ def test_one_hot_encoder_default_params(new_penguins_df): [{"index": 2, "value": 1.0}], ], }, + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("index", pa.int64()), ("value", pa.float64())])) + ), index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) @@ -482,6 +486,9 @@ def test_one_hot_encoder_default_params_fit_transform(new_penguins_df): [{"index": 2, "value": 1.0}], ], }, + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("index", pa.int64()), ("value", pa.float64())])) + ), index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) @@ -507,6 +514,9 @@ def test_one_hot_encoder_series_default_params(new_penguins_df): [{"index": 2, "value": 1.0}], ], }, + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("index", pa.int64()), ("value", pa.float64())])) + ), index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) @@ -537,6 +547,9 @@ def test_one_hot_encoder_params(new_penguins_df): [{"index": 0, "value": 1.0}], ], }, + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("index", pa.int64()), ("value", pa.float64())])) + ), index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) @@ -567,6 +580,9 @@ def test_one_hot_encoder_different_data(penguins_df_default_index, new_penguins_ [{"index": 2, "value": 1.0}], ], }, + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("index", pa.int64()), ("value", pa.float64())])) + ), index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) diff --git a/tests/system/small/ml/test_remote.py b/tests/system/small/ml/test_remote.py index e8eb1c85e8..5036cdadfc 100644 --- a/tests/system/small/ml/test_remote.py +++ b/tests/system/small/ml/test_remote.py @@ -29,5 +29,6 @@ def test_remote_linear_vertex_model_predict( predictions[["predicted_body_mass_g"]].sort_index(), expected, check_exact=False, + check_dtype=False, rtol=0.1, ) diff --git a/tests/system/small/operations/test_strings.py b/tests/system/small/operations/test_strings.py index 27a35134d4..79f92c94b4 100644 --- a/tests/system/small/operations/test_strings.py +++ b/tests/system/small/operations/test_strings.py @@ -94,6 +94,8 @@ def test_str_extract(scalars_dfs, pat): (".*", "blah", True, 0, True), ("h.l", "blah", False, 0, True), (re.compile("(?i).e.."), "blah", None, 0, True), + ("H", "h", True, 0, False), + (", ", "__", True, 0, False), ], ) def test_str_replace(scalars_dfs, pat, repl, case, flags, regex): diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 663a7ceb49..cb2e4f94fa 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -19,7 +19,6 @@ from typing import Tuple import geopandas as gpd # type: ignore -import numpy as np import pandas as pd import pandas.testing import pyarrow as pa # type: ignore @@ -29,7 +28,11 @@ import bigframes._config.display_options as display_options import bigframes.dataframe as dataframe import bigframes.series as series -from tests.system.utils import assert_pandas_df_equal, assert_series_equal +from tests.system.utils import ( + assert_pandas_df_equal, + assert_series_equal, + skip_legacy_pandas, +) def test_df_construct_copy(scalars_dfs): @@ -273,19 +276,19 @@ def test_df_info(scalars_dfs): " # Column Non-Null Count Dtype\n" "--- ------------- ---------------- ------------------------------\n" " 0 bool_col 8 non-null boolean\n" - " 1 bytes_col 6 non-null object\n" + " 1 bytes_col 6 non-null binary[pyarrow]\n" " 2 date_col 7 non-null date32[day][pyarrow]\n" " 3 datetime_col 6 non-null timestamp[us][pyarrow]\n" " 4 geography_col 4 non-null geometry\n" " 5 int64_col 8 non-null Int64\n" " 6 int64_too 9 non-null Int64\n" - " 7 numeric_col 6 non-null object\n" + " 7 numeric_col 6 non-null decimal128(38, 9)[pyarrow]\n" " 8 float64_col 7 non-null Float64\n" " 9 rowindex_2 9 non-null Int64\n" " 10 string_col 8 non-null string\n" " 11 time_col 6 non-null time64[us][pyarrow]\n" " 12 timestamp_col 6 non-null timestamp[us, tz=UTC][pyarrow]\n" - "dtypes: Float64(1), Int64(3), boolean(1), date32[day][pyarrow](1), geometry(1), object(2), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n" + "dtypes: Float64(1), Int64(3), binary[pyarrow](1), boolean(1), date32[day][pyarrow](1), decimal128(38, 9)[pyarrow](1), geometry(1), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n" "memory usage: 945 bytes\n" ) @@ -362,6 +365,7 @@ def test_drop_bigframes_index_with_na(scalars_dfs): pd.testing.assert_frame_equal(pd_result, bf_result) +@skip_legacy_pandas def test_drop_bigframes_multiindex(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs scalars_df = scalars_df.copy() @@ -839,6 +843,50 @@ def test_df_fillna(scalars_dfs): pandas.testing.assert_frame_equal(bf_result, pd_result) +def test_df_replace_scalar_scalar(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.replace(555.555, 3).to_pandas() + pd_result = scalars_pandas_df.replace(555.555, 3) + + # pandas has narrower result types as they are determined dynamically + pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False) + + +def test_df_replace_regex_scalar(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.replace("^H.l", "Howdy, Planet!", regex=True).to_pandas() + pd_result = scalars_pandas_df.replace("^H.l", "Howdy, Planet!", regex=True) + + pd.testing.assert_frame_equal( + pd_result, + bf_result, + ) + + +def test_df_replace_list_scalar(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.replace([555.555, 3.2], 3).to_pandas() + pd_result = scalars_pandas_df.replace([555.555, 3.2], 3) + + # pandas has narrower result types as they are determined dynamically + pd.testing.assert_frame_equal( + pd_result, + bf_result, + check_dtype=False, + ) + + +def test_df_replace_value_dict(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.replace(1, {"int64_col": 100, "int64_too": 200}).to_pandas() + pd_result = scalars_pandas_df.replace(1, {"int64_col": 100, "int64_too": 200}) + + pd.testing.assert_frame_equal( + pd_result, + bf_result, + ) + + def test_df_ffill(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df[["int64_col", "float64_col"]].ffill(limit=1).to_pandas() @@ -1154,13 +1202,13 @@ def test_get_dtypes(scalars_df_default_index): pd.Series( { "bool_col": pd.BooleanDtype(), - "bytes_col": np.dtype("O"), + "bytes_col": pd.ArrowDtype(pa.binary()), "date_col": pd.ArrowDtype(pa.date32()), "datetime_col": pd.ArrowDtype(pa.timestamp("us")), "geography_col": gpd.array.GeometryDtype(), "int64_col": pd.Int64Dtype(), "int64_too": pd.Int64Dtype(), - "numeric_col": np.dtype("O"), + "numeric_col": pd.ArrowDtype(pa.decimal128(38, 9)), "float64_col": pd.Float64Dtype(), "rowindex": pd.Int64Dtype(), "rowindex_2": pd.Int64Dtype(), @@ -1188,7 +1236,7 @@ def test_get_dtypes_array_struct(session): dtypes, pd.Series( { - "array_column": np.dtype("O"), + "array_column": pd.ArrowDtype(pa.list_(pa.int64())), "struct_column": pd.ArrowDtype( pa.struct( [ @@ -2094,6 +2142,7 @@ def test_dataframe_agg_multi_string(scalars_dfs): ).all() +@skip_legacy_pandas def test_df_describe(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs # pyarrows time columns fail in pandas @@ -3414,6 +3463,15 @@ def test_df_to_string(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +def test_df_to_html(scalars_df_index, scalars_pandas_df_index): + unsupported = ["numeric_col"] # formatted differently + + bf_result = scalars_df_index.drop(columns=unsupported).to_html() + pd_result = scalars_pandas_df_index.drop(columns=unsupported).to_html() + + assert bf_result == pd_result + + def test_df_to_markdown(scalars_df_index, scalars_pandas_df_index): # Nulls have bug from tabulate https://github.com/astanin/python-tabulate/issues/231 bf_result = scalars_df_index.dropna().to_markdown() @@ -3683,3 +3741,18 @@ def test_to_pandas_downsampling_option_override(session): total_memory_bytes = df.memory_usage(deep=True).sum() total_memory_mb = total_memory_bytes / (1024 * 1024) assert total_memory_mb == pytest.approx(download_size, rel=0.3) + + +def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_created): + dataset_id = dataset_id_not_created + destination_table = f"{dataset_id}.scalars_df" + + result_table = scalars_df_index.to_gbq(destination_table) + assert ( + result_table == destination_table + if destination_table + else result_table is not None + ) + + loaded_scalars_df_index = session.read_gbq(result_table) + assert not loaded_scalars_df_index.empty diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index fb9fb7bb89..6f1b31b48e 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -56,7 +56,9 @@ def test_to_pandas_array_struct_correct_result(session): result = df.to_pandas() expected = pd.DataFrame( { - "array_column": [[1, 3, 2]], + "array_column": pd.Series( + [[1, 3, 2]], dtype=pd.ArrowDtype(pa.list_(pa.int64())) + ), "struct_column": pd.Series( [{"string_field": "a", "float_field": 1.2}], dtype=pd.ArrowDtype( @@ -91,7 +93,8 @@ def test_load_json(session): expected = pd.DataFrame( { "json_column": ['{"bar":true,"foo":10}'], - } + }, + dtype=pd.StringDtype(storage="pyarrow"), ) expected.index = expected.index.astype("Int64") pd.testing.assert_series_equal(result.dtypes, expected.dtypes) @@ -137,6 +140,8 @@ def test_to_csv_index( dtype = scalars_df.reset_index().dtypes.to_dict() dtype.pop("geography_col") dtype.pop("rowindex") + # read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string + dtype.pop("bytes_col") gcs_df = pd.read_csv( path, dtype=dtype, @@ -148,7 +153,6 @@ def test_to_csv_index( scalars_pandas_df = scalars_pandas_df.copy() scalars_pandas_df.index = scalars_pandas_df.index.astype("int64") - # Ordering should be maintained for tables smaller than 1 GB. pd.testing.assert_frame_equal(gcs_df, scalars_pandas_df) @@ -174,6 +178,8 @@ def test_to_csv_tabs( dtype = scalars_df.reset_index().dtypes.to_dict() dtype.pop("geography_col") dtype.pop("rowindex") + # read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string + dtype.pop("bytes_col") gcs_df = pd.read_csv( path, sep="\t", @@ -216,6 +222,8 @@ def test_to_gbq_index(scalars_dfs, dataset_id, index): df_out = df_out.sort_values("rowindex_2").reset_index(drop=True) convert_pandas_dtypes(df_out, bytes_col=False) + # pd.read_gbq interpets bytes_col as object, reconvert to pyarrow binary + df_out["bytes_col"] = df_out["bytes_col"].astype(pd.ArrowDtype(pa.binary())) expected = scalars_pandas_df.copy() expected.index.name = index_col pd.testing.assert_frame_equal(df_out, expected, check_index_type=False) @@ -265,6 +273,50 @@ def test_to_gbq_if_exists( ) +def test_to_gbq_w_duplicate_column_names( + scalars_df_index, scalars_pandas_df_index, dataset_id +): + """Test the `to_gbq` API when dealing with duplicate column names.""" + destination_table = f"{dataset_id}.test_to_gbq_w_duplicate_column_names" + + # Renaming 'int64_too' to 'int64_col', which will result in 'int64_too' + # becoming 'int64_col_1' after deduplication. + scalars_df_index = scalars_df_index.rename(columns={"int64_too": "int64_col"}) + scalars_df_index.to_gbq(destination_table, if_exists="replace") + + bf_result = bpd.read_gbq(destination_table, index_col="rowindex").to_pandas() + + pd.testing.assert_series_equal( + scalars_pandas_df_index["int64_col"], bf_result["int64_col"] + ) + pd.testing.assert_series_equal( + scalars_pandas_df_index["int64_too"], + bf_result["int64_col_1"], + check_names=False, + ) + + +def test_to_gbq_w_None_column_names( + scalars_df_index, scalars_pandas_df_index, dataset_id +): + """Test the `to_gbq` API with None as a column name.""" + destination_table = f"{dataset_id}.test_to_gbq_w_none_column_names" + + scalars_df_index = scalars_df_index.rename(columns={"int64_too": None}) + scalars_df_index.to_gbq(destination_table, if_exists="replace") + + bf_result = bpd.read_gbq(destination_table, index_col="rowindex").to_pandas() + + pd.testing.assert_series_equal( + scalars_pandas_df_index["int64_col"], bf_result["int64_col"] + ) + pd.testing.assert_series_equal( + scalars_pandas_df_index["int64_too"], + bf_result["bigframes_unnamed_column"], + check_names=False, + ) + + def test_to_gbq_w_invalid_destination_table(scalars_df_index): with pytest.raises(ValueError): scalars_df_index.to_gbq("table_id") @@ -377,7 +429,9 @@ def test_to_parquet_index(scalars_dfs, gcs_folder, index): scalars_pandas_df.index = scalars_pandas_df.index.astype("Int64") # Ordering should be maintained for tables smaller than 1 GB. - pd.testing.assert_frame_equal(gcs_df, scalars_pandas_df) + pd.testing.assert_frame_equal( + gcs_df.drop("bytes_col", axis=1), scalars_pandas_df.drop("bytes_col", axis=1) + ) def test_to_sql_query_unnamed_index_included( diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 5214905186..2919c167ef 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -122,23 +122,32 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) +@pytest.mark.parametrize( + ("as_index"), + [ + (True), + (False), + ], +) def test_dataframe_groupby_agg_dict_with_list( - scalars_df_index, scalars_pandas_df_index + scalars_df_index, scalars_pandas_df_index, as_index ): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] bf_result = ( scalars_df_index[col_names] - .groupby("string_col") + .groupby("string_col", as_index=as_index) .agg({"int64_too": ["mean", "max"], "string_col": "count"}) ) pd_result = ( scalars_pandas_df_index[col_names] - .groupby("string_col") + .groupby("string_col", as_index=as_index) .agg({"int64_too": ["mean", "max"], "string_col": "count"}) ) bf_result_computed = bf_result.to_pandas() - pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) + pd.testing.assert_frame_equal( + pd_result, bf_result_computed, check_dtype=False, check_index_type=False + ) def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df_index): diff --git a/tests/system/small/test_ibis.py b/tests/system/small/test_ibis.py index 58b78e0048..9fe1176068 100644 --- a/tests/system/small/test_ibis.py +++ b/tests/system/small/test_ibis.py @@ -23,11 +23,16 @@ def test_approximate_quantiles(session: bigframes.Session, scalars_table_id: str): num_bins = 3 ibis_client = session.ibis_client - _, dataset, table_id = scalars_table_id.split(".") - ibis_table: ibis_types.Table = ibis_client.table(table_id, database=dataset) + project, dataset, table_id = scalars_table_id.split(".") + ibis_table: ibis_types.Table = ibis_client.table( # type: ignore + table_id, + schema=dataset, + database=project, + ) ibis_column: ibis_types.NumericColumn = ibis_table["int64_col"] - quantiles: ibis_types.ArrayScalar = vendored_ibis_ops.ApproximateMultiQuantile( # type: ignore - ibis_column, num_bins=num_bins + quantiles: ibis_types.ArrayScalar = vendored_ibis_ops.ApproximateMultiQuantile( + ibis_column, # type: ignore + num_bins=num_bins, # type: ignore ).to_expr() value = quantiles[1] num_edges = quantiles.length() diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index e7e93849c6..2d4e1f0204 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -356,17 +356,24 @@ def test_multi_index_dataframe_groupby(scalars_df_index, scalars_pandas_df_index def test_multi_index_dataframe_groupby_level_aggregate( scalars_df_index, scalars_pandas_df_index, level, as_index ): + index_cols = ["int64_too", "bool_col"] bf_result = ( - scalars_df_index.set_index(["int64_too", "bool_col"]) + scalars_df_index.set_index(index_cols) .groupby(level=level, as_index=as_index) .mean(numeric_only=True) .to_pandas() ) pd_result = ( - scalars_pandas_df_index.set_index(["int64_too", "bool_col"]) + scalars_pandas_df_index.set_index(index_cols) .groupby(level=level, as_index=as_index) .mean(numeric_only=True) ) + # For as_index=False, pandas will drop index levels used as groupings + # In the future, it will include this in the result, bigframes already does this behavior + if not as_index: + for col in index_cols: + if col in bf_result.columns: + bf_result = bf_result.drop(col, axis=1) # Pandas will have int64 index, while bigquery will have Int64 when resetting pandas.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) @@ -387,14 +394,17 @@ def test_multi_index_dataframe_groupby_level_aggregate( def test_multi_index_dataframe_groupby_level_analytic( scalars_df_index, scalars_pandas_df_index, level, as_index ): + # Drop "numeric_col" as pandas doesn't support numerics for grouped window function bf_result = ( - scalars_df_index.set_index(["int64_too", "bool_col"]) + scalars_df_index.drop("numeric_col", axis=1) + .set_index(["int64_too", "bool_col"]) .groupby(level=level, as_index=as_index, dropna=False) .cumsum(numeric_only=True) .to_pandas() ) pd_result = ( - scalars_pandas_df_index.set_index(["int64_too", "bool_col"]) + scalars_pandas_df_index.drop("numeric_col", axis=1) + .set_index(["int64_too", "bool_col"]) .groupby(level=level, as_index=as_index, dropna=False) .cumsum(numeric_only=True) ) diff --git a/tests/system/small/test_pandas.py b/tests/system/small/test_pandas.py index a1079288cf..282c0d68eb 100644 --- a/tests/system/small/test_pandas.py +++ b/tests/system/small/test_pandas.py @@ -365,6 +365,40 @@ def test_cut(scalars_dfs): pd.testing.assert_series_equal(bf_result, pd_result) +@pytest.mark.parametrize( + ("bins",), + [ + ([(-5, 2), (2, 3), (-3000, -10)],), + (pd.IntervalIndex.from_tuples([(1, 2), (2, 3), (4, 5)]),), + ], +) +def test_cut_with_interval(scalars_dfs, bins): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = bpd.cut(scalars_df["int64_too"], bins, labels=False).to_pandas() + + if isinstance(bins, list): + bins = pd.IntervalIndex.from_tuples(bins) + pd_result = pd.cut(scalars_pandas_df["int64_too"], bins, labels=False) + + # Convert to match data format + pd_result_converted = pd.Series( + [ + {"left_exclusive": interval.left, "right_inclusive": interval.right} + if pd.notna(val) + else pd.NA + for val, interval in zip( + pd_result, pd_result.cat.categories[pd_result.cat.codes] + ) + ], + name=pd_result.name, + ) + pd_result.index = pd_result.index.astype("Int64") + + pd.testing.assert_series_equal( + bf_result, pd_result_converted, check_index=False, check_dtype=False + ) + + @pytest.mark.parametrize( ("q",), [ diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 623da74aa4..6f919f740f 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -24,7 +24,11 @@ import bigframes.pandas import bigframes.series as series -from tests.system.utils import assert_pandas_df_equal, assert_series_equal +from tests.system.utils import ( + assert_pandas_df_equal, + assert_series_equal, + skip_legacy_pandas, +) def test_series_construct_copy(scalars_dfs): @@ -81,14 +85,14 @@ def test_series_construct_from_list_escaped_strings(): [ ("bool_col", pd.BooleanDtype()), # TODO(swast): Use a more efficient type. - ("bytes_col", numpy.dtype("object")), + ("bytes_col", pd.ArrowDtype(pa.binary())), ("date_col", pd.ArrowDtype(pa.date32())), ("datetime_col", pd.ArrowDtype(pa.timestamp("us"))), ("float64_col", pd.Float64Dtype()), ("geography_col", gpd.array.GeometryDtype()), ("int64_col", pd.Int64Dtype()), # TODO(swast): Use a more efficient type. - ("numeric_col", numpy.dtype("object")), + ("numeric_col", pd.ArrowDtype(pa.decimal128(38, 9))), ("int64_too", pd.Int64Dtype()), ("string_col", pd.StringDtype(storage="pyarrow")), ("time_col", pd.ArrowDtype(pa.time64("us"))), @@ -2519,8 +2523,12 @@ def test_mask_custom_value(scalars_dfs): ("int64_col", pd.Float64Dtype()), ("int64_col", "string[pyarrow]"), ("int64_col", "boolean"), + ("int64_col", pd.ArrowDtype(pa.decimal128(38, 9))), + ("int64_col", pd.ArrowDtype(pa.decimal256(76, 38))), ("bool_col", "Int64"), ("bool_col", "string[pyarrow]"), + ("string_col", "binary[pyarrow]"), + ("bytes_col", "string[pyarrow]"), # pandas actually doesn't let folks convert to/from naive timestamp and # raises a deprecation warning to use tz_localize/tz_convert instead, # but BigQuery always stores values as UTC and doesn't have to deal @@ -2538,6 +2546,7 @@ def test_mask_custom_value(scalars_dfs): # https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_functions ], ) +@skip_legacy_pandas def test_astype(scalars_df_index, scalars_pandas_df_index, column, to_type): bf_result = scalars_df_index[column].astype(to_type).to_pandas() pd_result = scalars_pandas_df_index[column].astype(to_type) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 26c5093b35..8ce442376a 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -30,6 +30,7 @@ import bigframes.dataframe import bigframes.dtypes import bigframes.ml.linear_model +from tests.system.utils import skip_legacy_pandas FIRST_FILE = "000000000000" @@ -44,7 +45,7 @@ def test_read_gbq_tokyo( result = df.sort_index().to_pandas() expected = scalars_pandas_df_index - _, query_job = df._block.expr.start_query() + _, query_job = session_tokyo._execute(df._block.expr) assert query_job.location == tokyo_location pd.testing.assert_frame_equal(result, expected) @@ -379,12 +380,13 @@ def test_read_pandas_tokyo( result = df.to_pandas() expected = scalars_pandas_df_index - _, query_job = df._block.expr.start_query() + _, query_job = session_tokyo._execute(df._block.expr) assert query_job.location == tokyo_location pd.testing.assert_frame_equal(result, expected) +@skip_legacy_pandas def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder): scalars_df, _ = scalars_dfs if scalars_df.index.name is not None: @@ -441,6 +443,7 @@ def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder): pytest.param("\t", id="custom_sep"), ], ) +@skip_legacy_pandas def test_read_csv_local_default_engine(session, scalars_dfs, sep): scalars_df, scalars_pandas_df = scalars_dfs with tempfile.TemporaryDirectory() as dir: diff --git a/tests/system/utils.py b/tests/system/utils.py index f49b5ece31..a4647b4f51 100644 --- a/tests/system/utils.py +++ b/tests/system/utils.py @@ -14,11 +14,23 @@ import base64 import decimal +import functools import geopandas as gpd # type: ignore import numpy as np import pandas as pd import pyarrow as pa # type: ignore +import pytest + + +def skip_legacy_pandas(test): + @functools.wraps(test) + def wrapper(*args, **kwds): + if pd.__version__.startswith("1."): + pytest.skip("Skips pandas 1.x as not compatible with 2.x behavior.") + return test(*args, **kwds) + + return wrapper def assert_pandas_df_equal(df0, df1, ignore_order: bool = False, **kwargs): @@ -133,16 +145,28 @@ def convert_pandas_dtypes(df: pd.DataFrame, bytes_col: bool): df["geography_col"].replace({np.nan: None}) ) - # Convert bytes types column. - if bytes_col: + if bytes_col and not isinstance(df["bytes_col"].dtype, pd.ArrowDtype): df["bytes_col"] = df["bytes_col"].apply( lambda value: base64.b64decode(value) if not pd.isnull(value) else value ) + arrow_table = pa.Table.from_pandas( + pd.DataFrame(df, columns=["bytes_col"]), + schema=pa.schema([("bytes_col", pa.binary())]), + ) + df["bytes_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)["bytes_col"] - # Convert numeric types column. - df["numeric_col"] = df["numeric_col"].apply( - lambda value: decimal.Decimal(str(value)) if value else None # type: ignore - ) + if not isinstance(df["numeric_col"].dtype, pd.ArrowDtype): + # Convert numeric types column. + df["numeric_col"] = df["numeric_col"].apply( + lambda value: decimal.Decimal(str(value)) if value else None # type: ignore + ) + arrow_table = pa.Table.from_pandas( + pd.DataFrame(df, columns=["numeric_col"]), + schema=pa.schema([("numeric_col", pa.decimal128(38, 9))]), + ) + df["numeric_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)[ + "numeric_col" + ] def assert_pandas_df_equal_pca_components(actual, expected, **kwargs): diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 8ba321d122..b239b04671 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -79,7 +79,7 @@ def create_dataframe( # might not actually be used. Mock out the global session, too. monkeypatch.setattr(bigframes.core.global_session, "_global_session", session) bigframes.options.bigquery._session_started = True - return bigframes.dataframe.DataFrame({}, session=session) + return bigframes.dataframe.DataFrame({"col": []}, session=session) def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Session: diff --git a/tests/unit/test_dtypes.py b/tests/unit/test_dtypes.py index 6ceaaf911b..e648fd28cc 100644 --- a/tests/unit/test_dtypes.py +++ b/tests/unit/test_dtypes.py @@ -31,11 +31,11 @@ # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types pytest.param( ibis_dtypes.Decimal(precision=76, scale=38, nullable=True), - np.dtype("O"), + pd.ArrowDtype(pa.decimal256(76, 38)), id="bignumeric", ), pytest.param(ibis_dtypes.boolean, pd.BooleanDtype(), id="bool"), - pytest.param(ibis_dtypes.binary, np.dtype("O"), id="bytes"), + pytest.param(ibis_dtypes.binary, pd.ArrowDtype(pa.binary()), id="bytes"), pytest.param(ibis_dtypes.date, pd.ArrowDtype(pa.date32()), id="date"), pytest.param( ibis_dtypes.Timestamp(), pd.ArrowDtype(pa.timestamp("us")), id="datetime" @@ -49,10 +49,9 @@ pytest.param(ibis_dtypes.int8, pd.Int64Dtype(), id="int8-as-int64"), pytest.param(ibis_dtypes.int64, pd.Int64Dtype(), id="int64"), # TODO(tswast): custom dtype (or at least string dtype) for JSON objects - pytest.param(ibis_dtypes.json, np.dtype("O"), id="json"), pytest.param( ibis_dtypes.Decimal(precision=38, scale=9, nullable=True), - np.dtype("O"), + pd.ArrowDtype(pa.decimal128(38, 9)), id="numeric", ), pytest.param( diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/__init__.py b/third_party/bigframes_vendored/ibis/backends/bigquery/__init__.py index e69de29bb2..43508fab11 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/__init__.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/__init__.py @@ -0,0 +1,3 @@ +# Import all sub-modules to monkeypatch everything. +import third_party.bigframes_vendored.ibis.backends.bigquery.compiler # noqa +import third_party.bigframes_vendored.ibis.backends.bigquery.registry # noqa diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/compiler.py b/third_party/bigframes_vendored/ibis/backends/bigquery/compiler.py new file mode 100644 index 0000000000..414f0a7c81 --- /dev/null +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/compiler.py @@ -0,0 +1,59 @@ +# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/compiler.py +"""Module to convert from Ibis expression to SQL string.""" + +from __future__ import annotations + +import re + +from ibis.backends.base.sql import compiler as sql_compiler +import ibis.backends.bigquery.compiler +from ibis.backends.bigquery.datatypes import BigQueryType +import ibis.expr.datatypes as dt +import ibis.expr.operations as ops + +_NAME_REGEX = re.compile(r'[^!"$()*,./;?@[\\\]^`{}~\n]+') +_EXACT_NAME_REGEX = re.compile(f"^{_NAME_REGEX.pattern}$") + + +class BigQueryTableSetFormatter(sql_compiler.TableSetFormatter): + def _quote_identifier(self, name): + """Restore 6.x version of identifier quoting. + + 7.x uses sqlglot which as of December 2023 doesn't know about the + extended unicode names for BigQuery yet. + """ + if _EXACT_NAME_REGEX.match(name) is not None: + return name + return f"`{name}`" + + def _format_in_memory_table(self, op): + """Restore 6.x version of InMemoryTable. + + BigQuery DataFrames explicitly uses InMemoryTable only when we know + the data is small enough to embed in SQL. + """ + schema = op.schema + names = schema.names + types = schema.types + + raw_rows = [] + for row in op.data.to_frame().itertuples(index=False): + raw_row = ", ".join( + f"{self._translate(lit)} AS {name}" + for lit, name in zip( + map(ops.Literal, row, types), map(self._quote_identifier, names) + ) + ) + raw_rows.append(f"STRUCT({raw_row})") + array_type = BigQueryType.from_ibis(dt.Array(op.schema.as_struct())) + + return f"UNNEST({array_type}[{', '.join(raw_rows)}])" + + +# Override implementation. +ibis.backends.bigquery.compiler.BigQueryTableSetFormatter._quote_identifier = ( + BigQueryTableSetFormatter._quote_identifier +) +ibis.backends.bigquery.compiler.BigQueryTableSetFormatter._format_in_memory_table = ( + BigQueryTableSetFormatter._format_in_memory_table +) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/analytic.py b/third_party/bigframes_vendored/ibis/expr/operations/analytic.py index 038987cac9..3d6a3b37b1 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/analytic.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/analytic.py @@ -2,22 +2,22 @@ from __future__ import annotations -from ibis.expr.operations.analytic import Analytic +import ibis.expr.operations as ops import ibis.expr.rules as rlz -class FirstNonNullValue(Analytic): +class FirstNonNullValue(ops.Analytic): """Retrieve the first element.""" - arg = rlz.column(rlz.any) - output_dtype = rlz.dtype_like("arg") + arg: ops.Column + dtype = rlz.dtype_like("arg") -class LastNonNullValue(Analytic): +class LastNonNullValue(ops.Analytic): """Retrieve the last element.""" - arg = rlz.column(rlz.any) - output_dtype = rlz.dtype_like("arg") + arg: ops.Column + dtype = rlz.dtype_like("arg") __all__ = [ diff --git a/third_party/bigframes_vendored/ibis/expr/operations/json.py b/third_party/bigframes_vendored/ibis/expr/operations/json.py index dbb3fa3066..772c2e8ff4 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/json.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/json.py @@ -6,4 +6,4 @@ class ToJsonString(Unary): - output_dtype = dt.string + dtype = dt.string diff --git a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py index 5e6ad9ecf2..e6644f477a 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py @@ -3,8 +3,8 @@ from __future__ import annotations import ibis.expr.datatypes as dt +import ibis.expr.operations.core as ibis_ops_core from ibis.expr.operations.reductions import Filterable, Reduction -import ibis.expr.rules as rlz class ApproximateMultiQuantile(Filterable, Reduction): @@ -13,9 +13,9 @@ class ApproximateMultiQuantile(Filterable, Reduction): See: https://cloud.google.com/bigquery/docs/reference/standard-sql/approximate_aggregate_functions#approx_quantiles """ - arg = rlz.any - num_bins = rlz.value(dt.int64) - output_dtype = dt.Array(dt.float64) + arg: ibis_ops_core.Value + num_bins: ibis_ops_core.Value[dt.Int64] + dtype = dt.Array(dt.float64) __all__ = [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index c082b87336..c3794c550e 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -685,6 +685,130 @@ def to_string( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def to_html( + self, + buf=None, + columns: Sequence[str] | None = None, + col_space=None, + header: bool = True, + index: bool = True, + na_rep: str = "NaN", + formatters=None, + float_format=None, + sparsify: bool | None = None, + index_names: bool = True, + justify: str | None = None, + max_rows: int | None = None, + max_cols: int | None = None, + show_dimensions: bool = False, + decimal: str = ".", + bold_rows: bool = True, + classes: str | list | tuple | None = None, + escape: bool = True, + notebook: bool = False, + border: int | None = None, + table_id: str | None = None, + render_links: bool = False, + encoding: str | None = None, + ): + """Render a DataFrame as an HTML table. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) + >>> print(df.to_html()) +
+ | col1 | +col2 | +
---|---|---|
0 | +1 | +3 | +
1 | +2 | +4 | +