Skip to content

Commit 427e4c3

Browse files
Merge pull request #131 from nicusX/iceberg-sql-sink
New Iceberg SQL Sink example
2 parents 05fe94a + 70d26ad commit 427e4c3

File tree

10 files changed

+706
-3
lines changed

10 files changed

+706
-3
lines changed

java/Iceberg/IcebergSQLSink/README.md

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
## Iceberg Sink (Glue Data Catalog) using SQL
2+
3+
* Flink version: 1.20.0
4+
* Flink API: SQL API
5+
* Iceberg 1.9.1
6+
* Language: Java (11)
7+
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
8+
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) sink
9+
10+
This example demonstrates how to use
11+
[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Glue Data Catalog.
12+
13+
For simplicity, the application generates synthetic data, random stock prices, internally.
14+
Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records
15+
that can be converted to table format for SQL operations.
16+
17+
### Prerequisites
18+
19+
The application expects the following resources:
20+
* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default").
21+
The application creates the Table, but the Catalog must exist already.
22+
* An S3 bucket to write the Iceberg table.
23+
24+
#### IAM Permissions
25+
26+
The application must have IAM permissions to:
27+
* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
28+
See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
29+
* Read and Write from the S3 bucket.
30+
31+
### Runtime configuration
32+
33+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
34+
35+
When running locally, the configuration is read from the
36+
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
37+
38+
Runtime parameters:
39+
40+
| Group ID | Key | Default | Description |
41+
|-----------|--------------------------|-------------------|--------------------------------------------------------------------------------------------|
42+
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
43+
| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket and path URL prefix, starting with `s3://`. For example `s3://mybucket/iceberg`. |
44+
| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
45+
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |
46+
47+
### Running locally, in IntelliJ
48+
49+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
50+
51+
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
52+
53+
### Checkpoints
54+
55+
Checkpointing must be enabled. Iceberg commits writes on checkpoint.
56+
57+
When running locally, the application enables checkpoints programmatically, every 30 seconds.
58+
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
59+
60+
### Sample Data Schema
61+
62+
The application uses a predefined schema for the stock price data with the following fields:
63+
* `timestamp`: STRING - ISO timestamp of the record
64+
* `symbol`: STRING - Stock symbol (e.g., AAPL, AMZN)
65+
* `price`: FLOAT - Stock price (0-10 range)
66+
* `volumes`: INT - Trade volumes (0-1000000 range)
67+
68+
### Known limitations of the Flink Iceberg sink
69+
70+
At the moment there are current limitations concerning Flink Iceberg integration:
71+
* Doesn't support Iceberg Table with hidden partitioning
72+
* Doesn't support adding columns, removing columns, renaming columns or changing columns.
73+
74+
---
75+
76+
### Known Flink issue: Hadoop library clash
77+
78+
When integrating Flink with Iceberg, there's a common issue affecting most Flink setups
79+
80+
When using Flink SQL's `CREATE CATALOG` statements, Hadoop libraries must be available on the system classpath.
81+
However, standard Flink distributions use shaded dependencies that can create class loading conflicts with Hadoop's
82+
expectations.
83+
Flink default classloading, when running in Application mode, prevents from using some Hadoop classes even if
84+
included in the application uber-jar.
85+
86+
#### Solution
87+
88+
This example shows a simple workaround to prevent the Hadoop class clashing:
89+
1. Include a modified version of the Flink class `org.apache.flink.runtime.util.HadoopUtils`
90+
2. Use Maven Shade Plugin to prevent class conflicts
91+
92+
The modified [`org.apache.flink.runtime.util.HadoopUtils`](src/main/java/org/apache/flink/runtime/util/HadoopUtils.java)
93+
class is included in the source code of this project. You can include it as-is in your project, using the same package name.
94+
95+
The shading is configured in the [`pom.xml`](pom.xml). In your project you can copy the `<relocations>...</relocations>` configuration
96+
into the `maven-shade-plugin` configuration.
97+
98+
```xml
99+
<relocations>
100+
<relocation>
101+
<pattern>org.apache.hadoop.conf</pattern>
102+
<shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
103+
</relocation>
104+
<relocation>
105+
<pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
106+
<shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
107+
</relocation>
108+
</relocations>
109+
```

java/Iceberg/IcebergSQLSink/pom.xml

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>iceberg-sql-sink</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<target.java.version>11</target.java.version>
15+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
16+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
17+
<flink.major.version>1.20</flink.major.version>
18+
<flink.version>1.20.0</flink.version>
19+
<scala.version>2.12</scala.version>
20+
<iceberg.version>1.9.1</iceberg.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<log4j.version>2.23.1</log4j.version>
23+
<junit5.version>5.8.1</junit5.version>
24+
</properties>
25+
26+
27+
<dependencyManagement>
28+
<dependencies>
29+
<dependency>
30+
<groupId>com.amazonaws</groupId>
31+
<artifactId>aws-java-sdk-bom</artifactId>
32+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
33+
<version>1.12.782</version>
34+
<type>pom</type>
35+
<scope>import</scope>
36+
</dependency>
37+
<dependency>
38+
<groupId>software.amazon.awssdk</groupId>
39+
<artifactId>bom</artifactId>
40+
<version>2.28.29</version>
41+
<type>pom</type>
42+
<scope>import</scope>
43+
</dependency>
44+
</dependencies>
45+
</dependencyManagement>
46+
47+
<dependencies>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-streaming-java</artifactId>
51+
<version>${flink.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-table-api-java-bridge</artifactId>
57+
<version>${flink.version}</version>
58+
<scope>provided</scope>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>org.apache.flink</groupId>
63+
<artifactId>flink-table-planner_${scala.version}</artifactId>
64+
<version>${flink.version}</version>
65+
<scope>provided</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.apache.flink</groupId>
69+
<artifactId>flink-clients</artifactId>
70+
<version>${flink.version}</version>
71+
<scope>provided</scope>
72+
</dependency>
73+
74+
<dependency>
75+
<groupId>org.apache.flink</groupId>
76+
<artifactId>flink-connector-datagen</artifactId>
77+
<version>${flink.version}</version>
78+
<scope>provided</scope>
79+
</dependency>
80+
81+
<dependency>
82+
<groupId>com.amazonaws</groupId>
83+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
84+
<version>${kda.runtime.version}</version>
85+
<scope>provided</scope>
86+
</dependency>
87+
88+
<dependency>
89+
<groupId>org.apache.iceberg</groupId>
90+
<artifactId>iceberg-flink-runtime-${flink.major.version}</artifactId>
91+
<version>${iceberg.version}</version>
92+
</dependency>
93+
<dependency>
94+
<groupId>org.apache.iceberg</groupId>
95+
<artifactId>iceberg-aws-bundle</artifactId>
96+
<version>${iceberg.version}</version>
97+
</dependency>
98+
99+
<!-- S3 File System Support -->
100+
<dependency>
101+
<groupId>org.apache.flink</groupId>
102+
<artifactId>flink-s3-fs-hadoop</artifactId>
103+
<version>${flink.version}</version>
104+
</dependency>
105+
106+
<!-- Logging Dependencies -->
107+
<dependency>
108+
<groupId>org.apache.logging.log4j</groupId>
109+
<artifactId>log4j-slf4j-impl</artifactId>
110+
<version>${log4j.version}</version>
111+
</dependency>
112+
<dependency>
113+
<groupId>org.apache.logging.log4j</groupId>
114+
<artifactId>log4j-api</artifactId>
115+
<version>${log4j.version}</version>
116+
</dependency>
117+
<dependency>
118+
<groupId>org.apache.logging.log4j</groupId>
119+
<artifactId>log4j-core</artifactId>
120+
<version>${log4j.version}</version>
121+
</dependency>
122+
</dependencies>
123+
124+
<build>
125+
<plugins>
126+
<!-- Java Compiler -->
127+
<plugin>
128+
<groupId>org.apache.maven.plugins</groupId>
129+
<artifactId>maven-compiler-plugin</artifactId>
130+
<version>3.8.1</version>
131+
<configuration>
132+
<source>${target.java.version}</source>
133+
<target>${target.java.version}</target>
134+
</configuration>
135+
</plugin>
136+
137+
<!-- Shade plugin to build the fat-jar -->
138+
<plugin>
139+
<groupId>org.apache.maven.plugins</groupId>
140+
<artifactId>maven-shade-plugin</artifactId>
141+
<version>3.6.0</version>
142+
<executions>
143+
<execution>
144+
<phase>package</phase>
145+
<goals>
146+
<goal>shade</goal>
147+
</goals>
148+
<configuration>
149+
<artifactSet>
150+
<excludes>
151+
<exclude>org.apache.flink:force-shading</exclude>
152+
<exclude>com.google.code.findbugs:jsr305</exclude>
153+
<exclude>org.slf4j:*</exclude>
154+
<exclude>log4j:*</exclude>
155+
</excludes>
156+
</artifactSet>
157+
<filters>
158+
<filter>
159+
<artifact>*:*</artifact>
160+
<excludes>
161+
<exclude>META-INF/*.SF</exclude>
162+
<exclude>META-INF/*.DSA</exclude>
163+
<exclude>META-INF/*.RSA</exclude>
164+
</excludes>
165+
</filter>
166+
</filters>
167+
<transformers>
168+
<transformer
169+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
170+
<transformer
171+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
172+
<mainClass>com.amazonaws.services.msf.IcebergSQLSinkJob</mainClass>
173+
</transformer>
174+
</transformers>
175+
<!-- We relocate Hadoop-conf classes packaged with the application,
176+
along with the modified HadoopUtils class -->
177+
<relocations>
178+
<relocation>
179+
<pattern>org.apache.hadoop.conf</pattern>
180+
<shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
181+
</relocation>
182+
<relocation>
183+
<pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
184+
<shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
185+
</relocation>
186+
</relocations>
187+
</configuration>
188+
</execution>
189+
</executions>
190+
</plugin>
191+
</plugins>
192+
</build>
193+
</project>

0 commit comments

Comments
 (0)