-
Notifications
You must be signed in to change notification settings - Fork 36
S3 Table SQL Sink #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
S3 Table SQL Sink #112
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not used in this example
|
||
// 2. Load properties and configure environment | ||
Map<String, Properties> applicationProperties = loadApplicationProperties(env); | ||
Properties icebergProperties = applicationProperties.get("Iceberg"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this line in the bloc // 3.... because it's related to that
Properties icebergProperties = applicationProperties.get("Iceberg"); | ||
|
||
// Configure local development settings if needed | ||
if (isLocal(env)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain CP must be enabled when running locally because Iceberg commits on CP
} | ||
|
||
// 3. Setup configuration properties with validation | ||
setupS3TableProperties(icebergProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to explain that you need to create the Catalog programmatically (as opposed as in SQL).
Customers are trying to create the catalog in SQL and failing
} | ||
|
||
private static void setupS3TableProperties(Properties icebergProperties) { | ||
tableBucketArn = icebergProperties.getProperty("table.bucket.arn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try avoiding static fields as global variables.
Just make them local variables, exact and validate inline in the main() just before using them
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-common</artifactId> | ||
<version>${flink.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must be provided
<dependency> | ||
<groupId>software.amazon.s3tables</groupId> | ||
<artifactId>s3-tables-catalog-for-iceberg</artifactId> | ||
<version>0.1.6</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better making this version a property
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>s3tables</artifactId> | ||
<version>2.31.50</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better making this version a property
<artifactId>iceberg-core</artifactId> | ||
<version>${iceberg.version}</version> | ||
</dependency> | ||
<!-- Remove duplicate iceberg-flink dependency --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this comment mean?
@@ -0,0 +1,85 @@ | |||
# Flink Iceberg Sink using SQL API with S3 Tables | |||
|
|||
* Flink version: 1.19.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to explain why 1.19
Purpose of the change
For example: modify the Java Kinesis Sink to provide the stream ARN
Verifying this change
Please test your changes both running locally, in the IDE, and in Managed Service for Apache Flink. All examples must run
in both environment without code changes.
Describe how you tested your application, show the output of the running application with screenshots.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterward, for convenience.)