Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support purge for Spark 3.2 #3056

Merged
merged 12 commits into from
Mar 3, 2022
Merged

Support purge for Spark 3.2 #3056

merged 12 commits into from
Mar 3, 2022

Conversation

ConeyLiu
Copy link
Contributor

Spark 3.1.0 split the dropTable into two APIs: dropTable and purgeTable. By default, the purgeTable will throw not implemented exception. In this patch, add the purgeTable support.

@github-actions github-actions bot added the spark label Aug 31, 2021
@ConeyLiu ConeyLiu closed this Aug 31, 2021
@ConeyLiu ConeyLiu reopened this Aug 31, 2021
@ConeyLiu
Copy link
Contributor Author

gentle ping @rdblue @aokolnychyi, could you help to review this also? Thanks a lot.

sql("SELECT * FROM %s", tableName));

sql("DROP TABLE %s PURGE", tableName);
Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't sufficient to test purge. I think you should create a mock catalog to test that the argument is passed through to dropTable correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the UT.

@@ -287,4 +302,13 @@ private T getSessionCatalog() {
"Please make sure your are replacing Spark's default catalog, named 'spark_catalog'.");
return sessionCatalog;
}

private boolean callPurgeTable(Object catalog, Identifier ident) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to use reflection here. I'd probably wait to merge this until after the Spark refactor so that we can implement new APIs from 3.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, waiting for the Spark refactor.

@ConeyLiu
Copy link
Contributor Author

Thanks @rdblue for the review. I am sorry for the later response.

*/


package org.apache.iceberg.spark.extensions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the test added in the extension package instead of spark3 directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jackye1995 for the review, moved to spark3.

@wypoon
Copy link
Contributor

wypoon commented Oct 25, 2021

Support for Spark 3.2 has been added. I think the purgeTable support can be added to SparkCatalog in v3.2. Two versions for the new test can be added, one in v3.0 and another in v3.2, with different expected behavior.
If we add v3.1, then the purgeTable support can be added to SparkCatalog in v3.1 as well.

@ConeyLiu
Copy link
Contributor Author

I will resolve conflicts recently.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Nov 4, 2021

@rdblue @jackye1995 @wypoon the code has been rebased, please help to review when you have time. Thanks a lot.

@ConeyLiu ConeyLiu changed the title Support purge for Spark 3.1.x Support purge for Spark 3.1 & 3.2 Nov 4, 2021
@ajantha-bhat
Copy link
Member

@ConeyLiu, @rdblue, @jackye1995: Should we support this with spark3.2 now ? I see some of the users in slack asking for this feature.

@rdblue
Copy link
Contributor

rdblue commented Jan 31, 2022

@ajantha-bhat, thanks for bringing this back up. Now that the refactor is done, this should be much easier to support in 3.1 and 3.2. @ConeyLiu would you like to update this?

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Feb 1, 2022 via email

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Feb 1, 2022

@ajantha-bhat @rdblue Please take a look, thanks a lot.

}
}

private List<Object[]> sql(SparkSession spark, String query, Object... args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here? Isn't there already a sql method available?

@ConeyLiu ConeyLiu changed the title Support purge for Spark 3.1 & 3.2 Support purge for Spark 3.2 Feb 8, 2022

@Before
public void createTable() {
// Spark CatalogManager cached the loaded catalog, here we use new SparkSession to force it load the catalog again
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue please see the comments. We have customed the catalog to verify whether drop or purge method is called. So we need to load a new catalog for each test. That's why we define a new drop method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't sound correct to me. You should not need to alter the catalog to check whether a table is purged. Can't you get the location from the table itself? And this appears to make the tests specific to the Hadoop catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adopted the previous comments, maybe it is too old:

This isn't sufficient to test purge. I think you should create a mock catalog to test that the argument is passed through to dropTable correctly.

try {
return isPathIdentifier(ident) ?
tables.dropTable(((PathIdentifier) ident).location()) :
icebergCatalog.dropTable(buildIdentifier(ident));
tables.dropTable(((PathIdentifier) ident).location(), purge) :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Purge is True and GC_Enabled is false should we allow a purge? Or is should that throw an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate on this, we have a table property GC_ENABLED which we use to signify that a table identifier may not have full ownership of all the data files it references. When this flag is enabled we disable Iceberg operations which can phyiscally delete files like EXPIRE_SNAPSHOTS and REMOVE_ORPHAN_FILES. I feel like DROP TABLE PURGE is probably very similar and it should be blocked.

An example would be a user creating a Snapshot of another table, then dropping it with purge. We wouldn't want to delete files owned by the original table which has been snapshotted.

Copy link
Contributor

@jackye1995 jackye1995 Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that gc.enabled should only impact the snapshot expiration (including remove orphan files) aspect, at least that is what my definition of “garbage collection” is. The DROP TABLE behavior of Iceberg external vs managed table are different discussions. We should not try to work around the managed table issue using gc.enabled.

The industry has been using this notion of Managed vs External tables for a long time now. Most people simply associate it with the delete table behavior, but it really is describing the ownership of a table with respect to an engine with a data catalog. Just take Hive as an example, Managed table (Hive ACID) means table is in Hive metastore, and also the data is fully manged by the Hive data warehouse compute system. Changing data using external systems will likely cause trouble, so they are supposed to only read data, which means to treat a table as External. This has been how most of the vertical integration products define their table concepts, including Snowflake and AWS Redshift.

In the Iceberg multi-engine model where data can be updated by any compute engine or process through the Iceberg library, this definition of managed that bundles catalog and engine is confusing. But we can still make it consistent by changing the definition. For Iceberg, the ownership of the table is bundled with the catalog service plus the Iceberg spec itself. Any compute engine can claim the responsibility to manage a table, as long as the Iceberg spec is respected and the catalog service is identified for the table. In this definition, the issue you describe is true, if two table identifiers point to the same underlying table metadata in storage, that means the process operating against the tables are both managing the underlying files. I think this should be made extra clear for users who would like to use the registerTable feature we just introduced, that the table registered is being fully managed, not just an External table.

I think an extension of registerTable is needed for Iceberg to complete the notion of external table. And it should be the catalog's responsibility to manage this semantics, maybe set a flag to distinguish external table from normal tables to make it clear that it's a read-only copy of a managed table. The registerTable interface should be updated to something like:

default Table registerTable(TableIdentifier identifier, String metadataFileLocation, boolean isManaged) {
    throw new UnsupportedOperationException("Registering tables is not supported");
  }

If not managed, some sort of flag should be set at catalog level, so that if user later tries to do a DELETE TABLE PURGE, the catalog should reject the operation. I think this will fully solve the issue you describe about "a user creating a Snapshot of another table, then dropping it with purge"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995's summary of external is helpful. Thanks for helping set the context!

The confusion here often lies with users that have been taught to expect certain outcomes from external tables, like that external tables don't remove data when they're dropped. Those guarantees are inconsistent in the first place: you can still overwrite (remove files) in an external table in Hive. In addition, why should a user set whether a table's data can be deleted when it is dropped at the time the table is created?

How to clean up when dropping a table depends on the choices in the data platform. At Netflix, we removed references only and deleted data files only after a waiting period (to allow restoring a table). Users had no say in how the data was removed, only that the table was logically dropped. Both EXTERNAL and PURGE makes little sense with policies like that.

I think an extension of registerTable is needed for Iceberg to complete the notion of external table. And it should be the catalog's responsibility to manage this semantics, maybe set a flag to distinguish external table from normal tables to make it clear that it's a read-only copy of a managed table.

I really don't think that the concept of EXTERNAL has a place in Iceberg and I am very skeptical that we should add it. EXTERNAL essentially means that the table is owned by some other system. But any engine can use an Iceberg catalog and the library to update a table. What is the utility of arbitrarily restricting that?

As Jack said, we can think of the relationship differently, removing the engine requirement. But if we still have EXTERNAL in this world, then it is really just to support secondary references. I don't know why you would want secondary references that look exactly like primary references but are restricted from making changes. Just connect to the catalog. And if there's a need, we can add a catalog flag that makes it read-only.

That still leaves an open question of what to do for gc.enabled. There are times when a table doesn't "own" the resources within it and gc.enabled is a quick way to account for that -- to basically turn off operations that may delete files that aren't owned. I think that includes PURGE, but I do see the distinction and agree that it is an ugly choice to need to make. Short term, I think we should disable PURGE and throw an exception if gc.enabled is true.

Longer term, I think we need to refine the idea of file ownership. The best way that I can think of is to do this by prefix, as we discussed on a thread in the Trino community a while back and in the Iceberg sync. In this model, a table gets its own prefix and can delete anything in that prefix (or more likely, a small set of prefixes). Imported data files are unowned and are not deleted by expire snapshots or purge if they are outside of owned prefixes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Ryan very much for the insights!

For the discussion related to GC, I am fine with disallowing purge when gc.enabled is set to false as a short term solution if we would like to define garbage collection that way. We should do it within the catalog implementations to ensure this behavior is consistent across engines.

I really don't think that the concept of EXTERNAL has a place in Iceberg and I am very skeptical that we should add it.

100% Agree. However we now opened the door by adding registerTable in catalog, which maps to the external table concept perfectly. I already received a few feature requests of people asking for this to map to CREATE EXTERNAL TABLE. People can now register an Iceberg table with an old metadata file location and do writes against it to create basically 2 diverged metadata history of the same table. This is very dangerous action because 2 Iceberg tables can now own the same set of files and corrupt each other.

As the first step, we should make it clear in the javadoc of registerTable that it's only used to recover a table. Creating 2 references in catalog of the same table metadata and do write operations on both to create diverged metadata history is not recommended and will have unintended side effects.

When I was suggesting to add the EXTERNAL concept to registerTable, to be honest I was really trying to make peace with people who wants to stick with this definition. At least we can have a read only solution and only encourage registering a normal table for recovery. But the more I think of this topic, the more I feel we should start to promote the right way to operate against Iceberg tables.

Just from correctness perspective, this is the wrong thing to promote. Even if people just want to read a historical metadata file, information like table properties are stale. It is always better to do time travel against the latest metadata, or run query against a tagged snapshot recorded in the latest metadata, and have the metadata location automatically updated with new commits.

As you said, EXTERNAL is also just arbitrarily limiting the ability for people to use Iceberg. I would say the only value is at business level. As compute vendors or data platforms that have such traditional definition of external and managed tables, it is much easier to provide external table support for an alien product comparing to full table operation support that is usually offered to managed tables only. My team underwent such debate for a long time before we decided to go with full support, and we tried really hard to explain the entire picture, but I doubt if other people could do the same. We can already see some vendors now offer Iceberg support under the name of "external table".

In the long term, we should start to promote a new table ownership model (maybe call it a SHARED model) and start to bring people up to date with how Iceberg tables are operated. Let me draft a doc for that to have a formal discussion, and also include concepts like table root location ownership in that doc so we can have full clarity in the domain of table ownership.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At Pinterest, we were also running into this issue where hive table users unexpectedly drop data when they create an iceberg table as EXTERNAL table type on existing data (more details on #4018). I think this would be a common accident among orgs trying out Iceberg. I can see how Netflix did not run into this issue, based on @rdblue comment, but I don't think all data platforms using hive tables drop only the references for drop table statements.

I think if we change the behavior of drop table to not drop any data that alleviates our concern on accidental drops on external tables. However, it also means that drop table on managed tables would leave data around, which is also an issue. Spark's documentation also says that data is dropped as part of drop table operation when it is not an external table, https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-drop-table.html. So, this behavior of drop table not dropping data for managed tables is also misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% Agree. However we now opened the door by adding registerTable in catalog, which maps to the external table concept perfectly. I already received a few feature requests of people asking for this to map to CREATE EXTERNAL TABLE. People can now register an Iceberg table with an old metadata file location and do writes against it to create basically 2 diverged metadata history of the same table. This is very dangerous action because 2 Iceberg tables can now own the same set of files and corrupt each other.

I'm not sure I agree that it maps perfectly. This is a way to register a table with a catalog, after which the catalog owns it like any other table. There should be nothing that suggests registration has anything to do with EXTERNAL and no reason for people to think that tables that are added to a catalog through registerTable should behave any differently.

If this confusion persists, I would support removing registerTable from the API.

Just from correctness perspective, this is the wrong thing to promote.

Agreed!

In the long term, we should start to promote a new table ownership model (maybe call it a SHARED model) and start to bring people up to date with how Iceberg tables are operated. Let me draft a doc for that to have a formal discussion, and also include concepts like table root location ownership in that doc so we can have full clarity in the domain of table ownership.

I'm not sure that I would want a SHARED keyword -- that just implies there are times when the table is not shared and we would get into similar trouble. But I think your idea to address this in a design doc is good.

Also, I consider the data/file ownership a separate problem, so you may want to keep them separate in design docs or proposals. I wouldn't want to confuse table modification with data file ownership, although modification does have implications for file ownership.

I think if we change the behavior of drop table to not drop any data that alleviates our concern on accidental drops on external tables. However, it also means that drop table on managed tables would leave data around, which is also an issue.

This is why Iceberg ignores EXTERNAL. The platform should be making these decisions, ideally. Users interact with logical tables, physical concerns are for the platform. If you don't have a platform-level plan for dropping table data, then I think the PURGE approach is okay because a user presumably makes the choice at the right time (rather than months if not years before the drop).

My general recommendation is to tell users that they're logically dropping tables and data. Maybe you can have a platform-supported way to un-delete, but when you drop a table you generally have no expectation that you didn't do anything destructive!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree that it maps perfectly. This is a way to register a table with a catalog, after which the catalog owns it like any other table.

probably too strong to say "perfectly". I am saying this not because I think it that way, but because multiple data engineers have asked me if they could use CREATE EXTERNAL TABLE to execute that registerTable feature. So from anecdotal evidence I think people will think in that way. Maybe we should call it something like reviveTable or expose it as a Spark procedure instead of making it a part of the catalog interface.

I'm not sure that I would want a SHARED keyword

I am not trying to promote a new keyword, I think we need a new concept to explain the difference from the traditional definition of external/managed tables.

Also, I consider the data/file ownership a separate problem, so you may want to keep them separate in design docs or proposals

I think it's hard to not mention them together because owning a table has implicit relationship with owning the file paths and files of the table. But we can discuss this further, I will update in a new devlist thread.

@rdblue
Copy link
Contributor

rdblue commented Feb 13, 2022

@ConeyLiu, I think we should simplify the way that the tests are happening. This should just validate that the data and metadata files for the table are deleted after a purge or not deleted when not purged. I don't think there is a need for custom catalogs or anything that is specific to the catalog that is used.

In addition, I think we need to fail PURGE if the table has gc.enabled=true in table properties. The thread discussing this is long and covers a few other topics, but I think there's agreement about handling gc.enabled.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Feb 14, 2022

Thanks everyone for the detailed and meaningful discussion.

In addition, I think we need to fail PURGE if the table has gc.enabled=true in table properties. The thread discussing this is long and covers a few other topics, but I think there's agreement about handling gc.enabled.

@rdblue, the UTs have been updated. It should be gc.enabled=false, right? And the UTs have covered the case.

@aokolnychyi
Copy link
Contributor

Let me also take a look.

@aokolnychyi
Copy link
Contributor

Looks good to me. Thanks for the work, @ConeyLiu! Let's wait for the tests.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Mar 1, 2022

@ConeyLiu, looks like we would need to adapt TestRemoveOrphanFilesProcedure since we added support for the purge flag. I guess we can simply modify removeTable (call PURGE) and testRemoveOrphanFilesGCDisabled (unset GC flag at the end).

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Mar 2, 2022

Thanks @aokolnychyi, the code has been updated. And it passed at locally testing.

@aokolnychyi
Copy link
Contributor

Thanks, @ConeyLiu. Let's wait for test results.

@aokolnychyi aokolnychyi merged commit 3dc245b into apache:master Mar 3, 2022
@aokolnychyi
Copy link
Contributor

Great work, @ConeyLiu! This has been an open issue for quite some time. We still have some remaining work that we will continue to discuss in #4159.

Thanks everyone for reviews!

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Mar 6, 2022

Thanks @aokolnychyi for merging this and also thanks to everyone for the review. I will submit a pr to spark 3.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants