Skip to content

feat: Add Series.peek to preview data efficiently #727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jun 26, 2024
Merged

Conversation

TrevorBergeron
Copy link
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. labels May 28, 2024
@product-auto-label product-auto-label bot added size: l Pull request size is large. and removed size: m Pull request size is medium. labels May 29, 2024
@TrevorBergeron TrevorBergeron marked this pull request as ready for review May 30, 2024 17:20
@TrevorBergeron TrevorBergeron requested review from a team as code owners May 30, 2024 17:20
@TrevorBergeron TrevorBergeron requested a review from junyazhang May 30, 2024 17:20
@TrevorBergeron TrevorBergeron requested a review from tswast May 30, 2024 17:20
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we split out the session-aware caching to a separate PR or is this pretty tightly coupled to Series.peek()?


def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]:
"""Try to determine cluster col candidates that work with given predicates."""
# TODO: Prioritize equality predicates over ranges
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rephrase? It took me a while to understand that you meant.

Perhaps add that since equality is narrower filter, it's more likely to reduce the data read if it's the first clustering filter.

Maybe this TODO should be a sort by how selective the predicates are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the idea is to cluster on the filter predicted to be most selective. update the todo

@@ -60,6 +60,11 @@ def order_preserving(self) -> bool:
"""Whether the row operation preserves total ordering. Can be pruned from ordering expressions."""
return False

@property
def pruning_compatible(self) -> bool:
"""Whether the operation preserves locality o"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a hanging "o". Was that meant to be "or ..."?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also like some more information for help in determining when an operation would be pruning compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually didn't end up using this. Removed from new revision. Later on will add some concept of "inverse" operation to help normalize predicates. For now though, only considering range and equality predicates between column and a constant.

Comment on lines 622 to 623
Preview n arbitrary elements from the series. No guarantees about row selection or ordering.
``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to keep the first line summary short.

Suggested change
Preview n arbitrary elements from the series. No guarantees about row selection or ordering.
``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires
Preview n arbitrary elements from the series without guarantees about row selection or ordering.
``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -332,13 +333,15 @@ def session_id(self):
@property
def objects(
self,
) -> collections.abc.Set[
) -> Tuple[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically a breaking change. Maybe OK since we didn't actually document this property, but might be better to change from Set to a broader type like Iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, actually, should we just make this private? Added this very recently and wasn't really intending this for user consumption.

@TrevorBergeron
Copy link
Contributor Author

Could we split out the session-aware caching to a separate PR or is this pretty tightly coupled to Series.peek()?

I worry that implementing Series.peek() without session-aware caching will result in a lot of redundant re-computation if users peek at multiple columns of the same dataframe. I could start with session-aware caching and apply it to dataframe.peek instead I guess.

@TrevorBergeron TrevorBergeron requested a review from tswast May 31, 2024 22:51

``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires
full data scanning. Using ``force=True`` will always succeed, but may be perform queries.
Query results will be cached so that future steps will benefit from these queries.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a caveat here that caching is session-aware and will attempt to cache the optimal subtree? (Not sure exactly how to phrase that in a friendlier way.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure how/if we should communicate this to users. I also don't want to lock in any specific execution strategy other than "we might cache if force=True, but we will make that cache as useful as possible using some unspecified approach".

@@ -1918,6 +1921,18 @@ def _cache_with_offsets(self, array_value: core.ArrayValue):
).node
self._cached_executions[array_value.node] = cached_replacement

def _session_aware_caching(self, array_value: core.ArrayValue) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's verbify this.

Suggested change
def _session_aware_caching(self, array_value: core.ArrayValue) -> None:
def _cache_with_session_awareness(self, array_value: core.ArrayValue) -> None:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

op = predicate.op
if isinstance(op, COMPARISON_OP_TYPES):
return cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1])
if isinstance(op, (type(ops.invert_op))):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a TODO for geo, too. Looks like functions like st_dwithin can take advantage of clustering on geo columns. https://cloud.google.com/blog/products/data-analytics/best-practices-for-spatial-clustering-in-bigquery?e=48754805

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added todo

Returns the node to cache, and optionally a clustering column.
"""
node_counts = traversals.count_nodes(session_forest)
# These node types are cheap to re-compute
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's complete the thought in this comment for clarity.

Suggested change
# These node types are cheap to re-compute
# These node types are cheap to re-compute, so it makes more sense to cache their children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if cur_node_refs > caching_target_refs:
caching_target, caching_target_refs = cur_node, cur_node_refs
cluster_col = None
# Just pick the first cluster-compatible predicate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO to sort by a selectivity heuristic? Seems like this layer might make more sense than cluster_cols_for_predicate to do that sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added todo

@TrevorBergeron TrevorBergeron requested a review from tswast June 5, 2024 00:26
return (
not is_array_like(type)
and not is_struct_like(type)
and (type not in (GEO_DTYPE, TIME_DTYPE, FLOAT_DTYPE))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Geo is clusterable but not orderable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this an allowlist, instead? I suspect as new types are added they aren't likely to be clusterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added clusterable property to dtype metadata struct, defaulting as False

@TrevorBergeron TrevorBergeron requested a review from tswast June 14, 2024 00:22
TrevorBergeron and others added 3 commits June 25, 2024 15:18
Co-authored-by: Tim Sweña (Swast) <swast@google.com>
Co-authored-by: Tim Sweña (Swast) <swast@google.com>
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a couple of comments that would be good to resolve before merging.

caching_target, caching_target_refs = cur_node, cur_node_refs
schema = cur_node.schema
# Cluster cols only consider the target object and not other sesssion objects
# Note, this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this comment ended mid-sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

cur_node = cur_node.child
cur_node_refs = node_counts.get(cur_node, 0)
if cur_node_refs > caching_target_refs:
caching_target, caching_target_refs = cur_node, cur_node_refs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do anything to make sure we aren't selecting more columns than needed? I have some worries that column selection wouldn't have the desired affect.

Though, I suppose that'll only matter with unordered + unindexed DataFrames due to our hashing of the row. Maybe worth a TODO to be resolved with that project?

That said, I'd be curious to see if unordered/unindexed would benefit from caching at all due to the difficulties of using the cache in row identity joins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row hashing shouldn't matter, as that only happens for initial table scan, which shouldn't need to be cached. However, yes, we could try to prune columns unused by the session before caching. Would need to be careful not to invalidate existing caching or join->projection rewriter, but should be possible. This could be done in a few ways, such as a partial cache (containing only some columns), or by rewriting all the session BFETs with a column pruning pass before caching.

@TrevorBergeron TrevorBergeron enabled auto-merge (squash) June 26, 2024 16:40
@TrevorBergeron TrevorBergeron merged commit 580e1b9 into main Jun 26, 2024
22 of 23 checks passed
@TrevorBergeron TrevorBergeron deleted the series_cache branch June 26, 2024 17:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. size: l Pull request size is large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants