-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-11905] [SQL] Support Persist/Cache and Unpersist in Dataset APIs #9889
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #46485 has finished for PR 9889 at commit
|
I'm worried the existing caching mechanisms might not work on dataset operations. Do we have a good notion of equality for encoders and lambda functions? Can you add some test coverage for this? |
I see, will make a try. Thanks! |
@marmbrus Do these newly added test cases resolve your concerns? |
Test build #46510 has finished for PR 9889 at commit
|
cc @marmbrus I will let you merge this one. |
|
||
/** | ||
* @since 1.6.0 | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment style here is off and we should actually have a description. Could we just move the functions/docs from DataFrame to Queryable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far, we are unable to move the functions to Queryable
because the types of the returned values are different. I just added the descriptions in both DataFrame
and Dataset
. Hopefully, it resolves your concern. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus moving functions into Queryable actually breaks both scaladoc and javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin I think thats only because we explicitly exclude execution from scaladoc. Maybe we should move queryable? or don't exclude that class. I don't want to duplicate a ton of docs.
It would be great to also have thats that ensure that things like |
@@ -27,6 +28,7 @@ private[sql] trait Queryable { | |||
def schema: StructType | |||
def queryExecution: QueryExecution | |||
def sqlContext: SQLContext | |||
private[sql] def logicalPlan: LogicalPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just get this from the queryExecution
in the cache manager? or at least define it explicitly here. I don't want dataframes and datasets to fall out of sync with regards to what the canonical plan phase is.
@@ -17,6 +17,8 @@ | |||
|
|||
package org.apache.spark.sql | |||
|
|||
import org.apache.spark.storage.StorageLevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
order imports.
I wouldn't block merging an initial version of this feature on this, but it would also be nice if we could support the following (this might be hard though): val f = (i: Int) => i + 1
val ds = Seq(1,2,3).toDS()
val mapped = ds.map(f)
mapped.cache()
val mapped2 = ds.map(f)
assertCached(mapped2) |
Now, I understood your concern. Thank you for the example! I added your example into the newly created testcase suite Running the test cases in my local computer. Will upload the new changes tomorrow morning. Thank you for your help! |
@gatorsmile just fyi if you have time, the python tests stuff is probably much more important than the more complicated case of caching. |
@rxin Sure, will do the Python testing at first. Thanks! |
retest this please |
Test build #46689 has finished for PR 9889 at commit
|
@marmbrus Not sure if the latest code changes resolve all your concerns. Please let me know if you have any suggestion. Thank you! Have a good Thanksgiving Day! |
ds2.persist() | ||
assertCached(ds2) | ||
|
||
val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertCached
joined
here.
Thank you! @marmbrus Will do the changes soon. |
Test build #46925 has finished for PR 9889 at commit
|
@marmbrus Please check the latest changes. Feel free to let me know if we need more changes. Thank you! |
Thanks, merging to master and 1.6. |
Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable? Please provide your opinions. marmbrus rxin cloud-fan Thank you very much! Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #9889 from gatorsmile/persistDS. (cherry picked from commit 0a7bca2) Signed-off-by: Michael Armbrust <michael@databricks.com>
Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable?
Please provide your opinions. @marmbrus @rxin @cloud-fan
Thank you very much!