Skip to content

Conversation

FranMorilloAWS
Copy link

Purpose of the change

For example: Implements the Table API for the Kinesis Source.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@FranMorilloAWS FranMorilloAWS marked this pull request as ready for review May 7, 2025 10:18
@FranMorilloAWS FranMorilloAWS changed the title [FLINK-29549]- Updated Flink Glue Catalog integration [FLINK-29549] Updated Flink Glue Catalog integration May 7, 2025
@FranMorilloAWS FranMorilloAWS changed the title [FLINK-29549] Updated Flink Glue Catalog integration [FLINK-29549]- Flink Glue Catalog integration May 7, 2025
Copy link
Contributor

@Samrat002 Samrat002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. A few points to note:

It looks like most of the code has already been reviewed across multiple PRs.

Concerns:

  1. The current implementation exposes low-level Glue details (e.g., support for lowercase table names) directly through the Flink Catalog. Is this acceptable? I recommend raising this topic with the community. In my opinion, it would be better to encapsulate such Glue-specific behavior and avoid exposing it directly via the Flink Catalog interface. The catalog's behavior should remain consistent with other catalogs, with differences controlled via configuration only.

  2. Please consider implementing the basic configuration options defined in FLIP-277. If that’s not feasible in this PR, a fast follow-up would be valuable, especially for users relying on different credential modes.

  3. It seems that some .idea folder files have been committed. Please remove them from the PR.

Cheers,
Samrat

@FranMorilloAWS
Copy link
Author

Thanks for the contribution. A few points to note:

It looks like most of the code has already been reviewed across multiple PRs.

Concerns:

1. The current implementation exposes low-level Glue details (e.g., support for lowercase table names) directly through the Flink Catalog. Is this acceptable? I recommend raising this topic with the community. In my opinion, it would be better to encapsulate such Glue-specific behavior and avoid exposing it directly via the Flink Catalog interface. The catalog's behavior should remain consistent with other catalogs, with differences controlled via configuration only.

2. Please consider implementing the basic configuration options defined in [FLIP-277](https://cwiki.apache.org/confluence/display/FLINK/FLIP-277). If that’s not feasible in this PR, a fast follow-up would be valuable, especially for users relying on different credential modes.

3. It seems that some `.idea` folder files have been committed. Please remove them from the PR.

Cheers, Samrat

Hey Samrat. How could we encapsulate this specific issues? From user perspective we are already limiting them creating tables and databases with uppercase. In regards of the schema, we encapsulate storing the actual original columnNames in column parameters in Glue, so even though user sees their schema in glue with lower case (default for glue). we actually leverage the original column name.

  1. Yes right after we get the first release delivered we will add the remainder authentication, and partitions support and alter support

  2. Fixed!

@Samrat002
Copy link
Contributor

Hey Samrat. How could we encapsulate this specific issues? From user perspective we are already limiting them creating tables and databases with uppercase. In regards of the schema, we encapsulate storing the actual original columnNames in column parameters in Glue, so even though user sees their schema in glue with lower case (default for glue). we actually leverage the original column name.

Thanks for the detailed response.

Can you help me understand the technical complexity to support CaseSensitivity from GlueCatalog?

This is a deviation.
I believe understanding how other connectors or catalogs handle mismatches, there can be a couple of ways to handle it

  1. Surfacing the mismatch to the end user with constraints
  2. Handling it in FlinkCatalog to provide consistent behaviour.

We can start a thread in the community to discuss this approach.

I am fine with either of the approaches as long as the community agrees on it.

*/
@Override
public void close() throws CatalogException {
if (glueClient != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a scenario where glueClient would be null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glueClient is created lazily.

i think in the following scenario client can be null

  1. Create a catalog
  2. Never fired any DDL or DML SQL command
  3. Close the catalog or shell

this guard unexpected failure NullPointerException

int maxRetries = 3;
int retryCount = 0;
long retryDelayMs = 200;
while (retryCount < maxRetries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious on what is the motivation for retries here? Is there a specific RuntimeException that we have in mind?

I had a look at AWS sdk, the close is best-effort and logs in case of any exceptions: https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/IoUtils.java#L76-L85

We can simplify code here by removing retry logic as sdk does not seem to be surfacing exceptions during close().

.tableList();

// Filter tables to only include those that are of type VIEW
List<String> viewNames = allTables.stream()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can return here

@nicusX
Copy link

nicusX commented May 28, 2025

@Samrat002 @FranMorilloAWS about case sensitivity, these are the logical rules I suggest:

  1. the actual case should preserved for all supported objects and used consistently in Flink interface, e.g. inCREATE TABLE... but also SHOW TABLE, SHOW COLUMNS ... etc. From a user's perspective, the way object names are actually stored in Glue should not be a concern.
  2. The additional metadata should be stored in Glue in a consistent way for all object types. Parameters seems to be available for Database, Tables, and Columns in Glue. A problem arises for Functions which don't have any Parameters if I am not mistaken.
  3. The additional metadata should not leak into Flink user interface. The "originalName" should not pop up in a SHOW TABLES or SHOW COLUMNS
  4. The fact Glue UI is limited and only shows lowercase names is not a concern of this component's

I would add that the way the additional metadata (the original name and any additional info required) is stored in Glue should be clearly documented, in case a user want's to build their own external interface to extract or update information in the Glue Catalog.

@FranMorilloAWS
Copy link
Author

Thanks for the contribution. A few points to note:

It looks like most of the code has already been reviewed across multiple PRs.

Concerns:

1. The current implementation exposes low-level Glue details (e.g., support for lowercase table names) directly through the Flink Catalog. Is this acceptable? I recommend raising this topic with the community. In my opinion, it would be better to encapsulate such Glue-specific behavior and avoid exposing it directly via the Flink Catalog interface. The catalog's behavior should remain consistent with other catalogs, with differences controlled via configuration only.

2. Please consider implementing the basic configuration options defined in [FLIP-277](https://cwiki.apache.org/confluence/display/FLINK/FLIP-277). If that’s not feasible in this PR, a fast follow-up would be valuable, especially for users relying on different credential modes.

3. It seems that some `.idea` folder files have been committed. Please remove them from the PR.

Cheers, Samrat

Hello @Samrat002 @nicusX. Using Database Parameters and Table Parameters we are now able to use lower/upper case for defining the Database and Table Name. By using the Show Tables/Show Databases, Describe table commands, we will show the original Flink Definition, even though in Glue UI it will be all in lower case

@Samrat002
Copy link
Contributor

Thanks for incorporating the changes. I will review the pr in next couple of days .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants