Skip to content

S3 Table SQL Sink #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

FranMorilloAWS
Copy link
Contributor

Purpose of the change

For example: modify the Java Kinesis Sink to provide the stream ARN

Verifying this change

Please test your changes both running locally, in the IDE, and in Managed Service for Apache Flink. All examples must run
in both environment without code changes.

Describe how you tested your application, show the output of the running application with screenshots.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterward, for convenience.)

  • Completely new example
  • Updated an existing example to newer Flink version or dependencies versions
  • Improved an existing example
  • Modified the runtime configuration of an existing example (i.e. added/removed/modified any runtime properties)
  • Modified the expected input or output of an existing example (e.g. modified the source or sink, modified the record schema)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not used in this example


// 2. Load properties and configure environment
Map<String, Properties> applicationProperties = loadApplicationProperties(env);
Properties icebergProperties = applicationProperties.get("Iceberg");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this line in the bloc // 3.... because it's related to that

Properties icebergProperties = applicationProperties.get("Iceberg");

// Configure local development settings if needed
if (isLocal(env)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain CP must be enabled when running locally because Iceberg commits on CP

}

// 3. Setup configuration properties with validation
setupS3TableProperties(icebergProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to explain that you need to create the Catalog programmatically (as opposed as in SQL).
Customers are trying to create the catalog in SQL and failing

}

private static void setupS3TableProperties(Properties icebergProperties) {
tableBucketArn = icebergProperties.getProperty("table.bucket.arn");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try avoiding static fields as global variables.
Just make them local variables, exact and validate inline in the main() just before using them

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be provided

<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>0.1.6</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better making this version a property

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>2.31.50</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better making this version a property

<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Remove duplicate iceberg-flink dependency -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

@@ -0,0 +1,85 @@
# Flink Iceberg Sink using SQL API with S3 Tables

* Flink version: 1.19.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explain why 1.19

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants