-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support purge for Spark 3.2 #3056
Conversation
gentle ping @rdblue @aokolnychyi, could you help to review this also? Thanks a lot. |
spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Outdated
Show resolved
Hide resolved
sql("SELECT * FROM %s", tableName)); | ||
|
||
sql("DROP TABLE %s PURGE", tableName); | ||
Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't sufficient to test purge. I think you should create a mock catalog to test that the argument is passed through to dropTable
correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the UT.
@@ -287,4 +302,13 @@ private T getSessionCatalog() { | |||
"Please make sure your are replacing Spark's default catalog, named 'spark_catalog'."); | |||
return sessionCatalog; | |||
} | |||
|
|||
private boolean callPurgeTable(Object catalog, Identifier ident) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to use reflection here. I'd probably wait to merge this until after the Spark refactor so that we can implement new APIs from 3.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, waiting for the Spark refactor.
Thanks @rdblue for the review. I am sorry for the later response. |
*/ | ||
|
||
|
||
package org.apache.iceberg.spark.extensions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the test added in the extension package instead of spark3 directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jackye1995 for the review, moved to spark3.
Support for Spark 3.2 has been added. I think the |
I will resolve conflicts recently. |
@rdblue @jackye1995 @wypoon the code has been rebased, please help to review when you have time. Thanks a lot. |
@ConeyLiu, @rdblue, @jackye1995: Should we support this with spark3.2 now ? I see some of the users in slack asking for this feature. |
@ajantha-bhat, thanks for bringing this back up. Now that the refactor is done, this should be much easier to support in 3.1 and 3.2. @ConeyLiu would you like to update this? |
OK, will do it.
Get Outlook for Android<https://aka.ms/AAb9ysg>
…________________________________
From: Ryan Blue ***@***.***>
Sent: Tuesday, February 1, 2022 12:09:40 AM
To: apache/iceberg ***@***.***>
Cc: Xianyang Liu ***@***.***>; Mention ***@***.***>
Subject: Re: [apache/iceberg] Support purge for Spark 3.1 & 3.2 (#3056)
@ajantha-bhat<https://github.com/ajantha-bhat>, thanks for bringing this back up. Now that the refactor is done, this should be much easier to support in 3.1 and 3.2. @ConeyLiu<https://github.com/ConeyLiu> would you like to update this?
—
Reply to this email directly, view it on GitHub<#3056 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/ADBEWSA65FIVO65ICGJ4HGLUY2X4JANCNFSM5DD7MGTQ>.
Triage notifications on the go with GitHub Mobile for iOS<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675> or Android<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you were mentioned.Message ID: ***@***.***>
|
@ajantha-bhat @rdblue Please take a look, thanks a lot. |
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private List<Object[]> sql(SparkSession spark, String query, Object... args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this here? Isn't there already a sql
method available?
|
||
@Before | ||
public void createTable() { | ||
// Spark CatalogManager cached the loaded catalog, here we use new SparkSession to force it load the catalog again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue please see the comments. We have customed the catalog
to verify whether drop
or purge
method is called. So we need to load a new catalog for each test. That's why we define a new drop
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't sound correct to me. You should not need to alter the catalog to check whether a table is purged. Can't you get the location from the table itself? And this appears to make the tests specific to the Hadoop catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adopted the previous comments, maybe it is too old:
This isn't sufficient to test purge. I think you should create a mock catalog to test that the argument is passed through to dropTable correctly.
try { | ||
return isPathIdentifier(ident) ? | ||
tables.dropTable(((PathIdentifier) ident).location()) : | ||
icebergCatalog.dropTable(buildIdentifier(ident)); | ||
tables.dropTable(((PathIdentifier) ident).location(), purge) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Purge is True and GC_Enabled is false should we allow a purge? Or is should that throw an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To elaborate on this, we have a table property GC_ENABLED which we use to signify that a table identifier may not have full ownership of all the data files it references. When this flag is enabled we disable Iceberg operations which can phyiscally delete files like EXPIRE_SNAPSHOTS and REMOVE_ORPHAN_FILES. I feel like DROP TABLE PURGE is probably very similar and it should be blocked.
An example would be a user creating a Snapshot of another table, then dropping it with purge. We wouldn't want to delete files owned by the original table which has been snapshotted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that gc.enabled
should only impact the snapshot expiration (including remove orphan files) aspect, at least that is what my definition of “garbage collection” is. The DROP TABLE behavior of Iceberg external vs managed table are different discussions. We should not try to work around the managed table issue using gc.enabled
.
The industry has been using this notion of Managed vs External tables for a long time now. Most people simply associate it with the delete table behavior, but it really is describing the ownership of a table with respect to an engine with a data catalog. Just take Hive as an example, Managed table (Hive ACID) means table is in Hive metastore, and also the data is fully manged by the Hive data warehouse compute system. Changing data using external systems will likely cause trouble, so they are supposed to only read data, which means to treat a table as External. This has been how most of the vertical integration products define their table concepts, including Snowflake and AWS Redshift.
In the Iceberg multi-engine model where data can be updated by any compute engine or process through the Iceberg library, this definition of managed that bundles catalog and engine is confusing. But we can still make it consistent by changing the definition. For Iceberg, the ownership of the table is bundled with the catalog service plus the Iceberg spec itself. Any compute engine can claim the responsibility to manage a table, as long as the Iceberg spec is respected and the catalog service is identified for the table. In this definition, the issue you describe is true, if two table identifiers point to the same underlying table metadata in storage, that means the process operating against the tables are both managing the underlying files. I think this should be made extra clear for users who would like to use the registerTable
feature we just introduced, that the table registered is being fully managed, not just an External table.
I think an extension of registerTable
is needed for Iceberg to complete the notion of external table. And it should be the catalog's responsibility to manage this semantics, maybe set a flag to distinguish external table from normal tables to make it clear that it's a read-only copy of a managed table. The registerTable
interface should be updated to something like:
default Table registerTable(TableIdentifier identifier, String metadataFileLocation, boolean isManaged) {
throw new UnsupportedOperationException("Registering tables is not supported");
}
If not managed, some sort of flag should be set at catalog level, so that if user later tries to do a DELETE TABLE PURGE, the catalog should reject the operation. I think this will fully solve the issue you describe about "a user creating a Snapshot of another table, then dropping it with purge"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995's summary of external is helpful. Thanks for helping set the context!
The confusion here often lies with users that have been taught to expect certain outcomes from external tables, like that external tables don't remove data when they're dropped. Those guarantees are inconsistent in the first place: you can still overwrite (remove files) in an external table in Hive. In addition, why should a user set whether a table's data can be deleted when it is dropped at the time the table is created?
How to clean up when dropping a table depends on the choices in the data platform. At Netflix, we removed references only and deleted data files only after a waiting period (to allow restoring a table). Users had no say in how the data was removed, only that the table was logically dropped. Both EXTERNAL
and PURGE
makes little sense with policies like that.
I think an extension of registerTable is needed for Iceberg to complete the notion of external table. And it should be the catalog's responsibility to manage this semantics, maybe set a flag to distinguish external table from normal tables to make it clear that it's a read-only copy of a managed table.
I really don't think that the concept of EXTERNAL
has a place in Iceberg and I am very skeptical that we should add it. EXTERNAL
essentially means that the table is owned by some other system. But any engine can use an Iceberg catalog and the library to update a table. What is the utility of arbitrarily restricting that?
As Jack said, we can think of the relationship differently, removing the engine requirement. But if we still have EXTERNAL
in this world, then it is really just to support secondary references. I don't know why you would want secondary references that look exactly like primary references but are restricted from making changes. Just connect to the catalog. And if there's a need, we can add a catalog flag that makes it read-only.
That still leaves an open question of what to do for gc.enabled
. There are times when a table doesn't "own" the resources within it and gc.enabled
is a quick way to account for that -- to basically turn off operations that may delete files that aren't owned. I think that includes PURGE, but I do see the distinction and agree that it is an ugly choice to need to make. Short term, I think we should disable PURGE and throw an exception if gc.enabled
is true.
Longer term, I think we need to refine the idea of file ownership. The best way that I can think of is to do this by prefix, as we discussed on a thread in the Trino community a while back and in the Iceberg sync. In this model, a table gets its own prefix and can delete anything in that prefix (or more likely, a small set of prefixes). Imported data files are unowned and are not deleted by expire snapshots or purge if they are outside of owned prefixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Ryan very much for the insights!
For the discussion related to GC, I am fine with disallowing purge when gc.enabled
is set to false as a short term solution if we would like to define garbage collection that way. We should do it within the catalog implementations to ensure this behavior is consistent across engines.
I really don't think that the concept of EXTERNAL has a place in Iceberg and I am very skeptical that we should add it.
100% Agree. However we now opened the door by adding registerTable
in catalog, which maps to the external table concept perfectly. I already received a few feature requests of people asking for this to map to CREATE EXTERNAL TABLE. People can now register an Iceberg table with an old metadata file location and do writes against it to create basically 2 diverged metadata history of the same table. This is very dangerous action because 2 Iceberg tables can now own the same set of files and corrupt each other.
As the first step, we should make it clear in the javadoc of registerTable
that it's only used to recover a table. Creating 2 references in catalog of the same table metadata and do write operations on both to create diverged metadata history is not recommended and will have unintended side effects.
When I was suggesting to add the EXTERNAL
concept to registerTable
, to be honest I was really trying to make peace with people who wants to stick with this definition. At least we can have a read only solution and only encourage registering a normal table for recovery. But the more I think of this topic, the more I feel we should start to promote the right way to operate against Iceberg tables.
Just from correctness perspective, this is the wrong thing to promote. Even if people just want to read a historical metadata file, information like table properties are stale. It is always better to do time travel against the latest metadata, or run query against a tagged snapshot recorded in the latest metadata, and have the metadata location automatically updated with new commits.
As you said, EXTERNAL
is also just arbitrarily limiting the ability for people to use Iceberg. I would say the only value is at business level. As compute vendors or data platforms that have such traditional definition of external and managed tables, it is much easier to provide external table support for an alien product comparing to full table operation support that is usually offered to managed tables only. My team underwent such debate for a long time before we decided to go with full support, and we tried really hard to explain the entire picture, but I doubt if other people could do the same. We can already see some vendors now offer Iceberg support under the name of "external table".
In the long term, we should start to promote a new table ownership model (maybe call it a SHARED
model) and start to bring people up to date with how Iceberg tables are operated. Let me draft a doc for that to have a formal discussion, and also include concepts like table root location ownership in that doc so we can have full clarity in the domain of table ownership.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At Pinterest, we were also running into this issue where hive table users unexpectedly drop data when they create an iceberg table as EXTERNAL
table type on existing data (more details on #4018). I think this would be a common accident among orgs trying out Iceberg. I can see how Netflix did not run into this issue, based on @rdblue comment, but I don't think all data platforms using hive tables drop only the references for drop table
statements.
I think if we change the behavior of drop table
to not drop any data that alleviates our concern on accidental drops on external tables. However, it also means that drop table
on managed tables would leave data around, which is also an issue. Spark's documentation also says that data is dropped as part of drop table
operation when it is not an external table, https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-drop-table.html. So, this behavior of drop table
not dropping data for managed tables is also misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% Agree. However we now opened the door by adding registerTable in catalog, which maps to the external table concept perfectly. I already received a few feature requests of people asking for this to map to CREATE EXTERNAL TABLE. People can now register an Iceberg table with an old metadata file location and do writes against it to create basically 2 diverged metadata history of the same table. This is very dangerous action because 2 Iceberg tables can now own the same set of files and corrupt each other.
I'm not sure I agree that it maps perfectly. This is a way to register a table with a catalog, after which the catalog owns it like any other table. There should be nothing that suggests registration has anything to do with EXTERNAL
and no reason for people to think that tables that are added to a catalog through registerTable
should behave any differently.
If this confusion persists, I would support removing registerTable
from the API.
Just from correctness perspective, this is the wrong thing to promote.
Agreed!
In the long term, we should start to promote a new table ownership model (maybe call it a SHARED model) and start to bring people up to date with how Iceberg tables are operated. Let me draft a doc for that to have a formal discussion, and also include concepts like table root location ownership in that doc so we can have full clarity in the domain of table ownership.
I'm not sure that I would want a SHARED
keyword -- that just implies there are times when the table is not shared and we would get into similar trouble. But I think your idea to address this in a design doc is good.
Also, I consider the data/file ownership a separate problem, so you may want to keep them separate in design docs or proposals. I wouldn't want to confuse table modification with data file ownership, although modification does have implications for file ownership.
I think if we change the behavior of drop table to not drop any data that alleviates our concern on accidental drops on external tables. However, it also means that drop table on managed tables would leave data around, which is also an issue.
This is why Iceberg ignores EXTERNAL
. The platform should be making these decisions, ideally. Users interact with logical tables, physical concerns are for the platform. If you don't have a platform-level plan for dropping table data, then I think the PURGE
approach is okay because a user presumably makes the choice at the right time (rather than months if not years before the drop).
My general recommendation is to tell users that they're logically dropping tables and data. Maybe you can have a platform-supported way to un-delete, but when you drop a table you generally have no expectation that you didn't do anything destructive!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree that it maps perfectly. This is a way to register a table with a catalog, after which the catalog owns it like any other table.
probably too strong to say "perfectly". I am saying this not because I think it that way, but because multiple data engineers have asked me if they could use CREATE EXTERNAL TABLE to execute that registerTable
feature. So from anecdotal evidence I think people will think in that way. Maybe we should call it something like reviveTable
or expose it as a Spark procedure instead of making it a part of the catalog interface.
I'm not sure that I would want a SHARED keyword
I am not trying to promote a new keyword, I think we need a new concept to explain the difference from the traditional definition of external/managed tables.
Also, I consider the data/file ownership a separate problem, so you may want to keep them separate in design docs or proposals
I think it's hard to not mention them together because owning a table has implicit relationship with owning the file paths and files of the table. But we can discuss this further, I will update in a new devlist thread.
@ConeyLiu, I think we should simplify the way that the tests are happening. This should just validate that the data and metadata files for the table are deleted after a purge or not deleted when not purged. I don't think there is a need for custom catalogs or anything that is specific to the catalog that is used. In addition, I think we need to fail |
Thanks everyone for the detailed and meaningful discussion.
@rdblue, the UTs have been updated. It should be |
Let me also take a look. |
Looks good to me. Thanks for the work, @ConeyLiu! Let's wait for the tests. |
@ConeyLiu, looks like we would need to adapt |
Thanks @aokolnychyi, the code has been updated. And it passed at locally testing. |
Thanks, @ConeyLiu. Let's wait for test results. |
Thanks @aokolnychyi for merging this and also thanks to everyone for the review. I will submit a pr to spark 3.1. |
(cherry picked from commit 3dc245b)
Spark 3.1.0 split the
dropTable
into two APIs:dropTable
andpurgeTable
. By default, thepurgeTable
will throw not implemented exception. In this patch, add thepurgeTable
support.