Skip to content

Commit fc9f1c0

Browse files
authored
feat: Table API (Java) over aggregations tutorial (#97)
* chore: upgrade table API dependency * feat: Flink Table API (Java) over window aggregations tutorial
1 parent 98deafd commit fc9f1c0

File tree

10 files changed

+257
-4
lines changed

10 files changed

+257
-4
lines changed

filtering/flink_table_api_java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repositories {
2424
}
2525

2626
dependencies {
27-
implementation 'org.apache.flink:flink-table-api-java:1.20.0'
27+
implementation 'org.apache.flink:flink-table-api-java:1.20.1'
2828
implementation 'io.confluent.flink:confluent-flink-table-api-java-plugin:1.20-50'
2929
implementation 'org.slf4j:slf4j-api:2.0.17'
3030
implementation 'org.slf4j:slf4j-simple:2.0.17'

hopping-windows/flink_table_api_java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repositories {
2424
}
2525

2626
dependencies {
27-
implementation 'org.apache.flink:flink-table-api-java:1.20.0'
27+
implementation 'org.apache.flink:flink-table-api-java:1.20.1'
2828
implementation 'io.confluent.flink:confluent-flink-table-api-java-plugin:1.20-50'
2929
implementation 'org.slf4j:slf4j-api:2.0.17'
3030
implementation 'org.slf4j:slf4j-simple:2.0.17'

joining-stream-stream/flink_table_api_java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repositories {
2424
}
2525

2626
dependencies {
27-
implementation 'org.apache.flink:flink-table-api-java:1.20.0'
27+
implementation 'org.apache.flink:flink-table-api-java:1.20.1'
2828
implementation 'io.confluent.flink:confluent-flink-table-api-java-plugin:1.20-50'
2929
implementation 'org.slf4j:slf4j-api:2.0.17'
3030
implementation 'org.slf4j:slf4j-simple:2.0.17'
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
<!-- title: How to perform over window aggregations in Java using Flink's Table API for Confluent Cloud -->
2+
<!-- description: In this tutorial, learn how to perform over window aggregations in Java using Flink's Table API for Confluent Cloud, with step-by-step instructions and supporting code. -->
3+
4+
# How to perform over window aggregations in Java using Flink's Table API for Confluent Cloud
5+
6+
In this tutorial, you will learn how to perform an aggregation over every row's preceding rows. For example, imagine a table of online orders and wanting to calculate the rolling average price of the last 5 orders _every time a new order comes in_. This is called an over window aggregation and is in contrast to group window aggregations that calculate one aggregate for each group (or group / window pair if it's a group window aggregation).
7+
8+
## Prerequisites
9+
10+
* A [Confluent Cloud](https://confluent.cloud/signup) account
11+
* The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) installed on your machine
12+
* Java 21, e.g., follow the OpenJDK installation instructions [here](https://openjdk.org/install/) if you don't have Java. Validate that `java -version` shows version 21.
13+
* Clone the `confluentinc/tutorials` GitHub repository (if you haven't already) and navigate to the `tutorials` directory:
14+
```shell
15+
git clone [email protected]:confluentinc/tutorials.git
16+
cd tutorials
17+
```
18+
19+
## Provision Confluent Cloud infrastructure
20+
21+
If you already have the Confluent Cloud resources required to populate a Table API client configuration file, e.g., from running a different tutorial, you may skip to the [next step](#inspect-the-code) after creating or copying the properties file as documented [here](https://docs.confluent.io/cloud/current/flink/reference/table-api.html#properties-file) to `over-aggregations/flink_table_api_java/src/main/resources/cloud.properties` within the top-level `tutorials` directory.
22+
23+
If you need to create the Confluent Cloud infrastructure needed to run this tutorial, the `confluent-flink-quickstart` CLI plugin creates the resources that you need to get started with Confluent Cloud for Apache Flink. Install it by running:
24+
25+
```shell
26+
confluent plugin install confluent-flink-quickstart
27+
```
28+
29+
Run the plugin as follows to create the Confluent Cloud resources needed for this tutorial and generate a Table API client configuration file. Note that you may specify a different cloud provider (`gcp` or `azure`) or region. You can find supported regions in a given cloud provider by running `confluent flink region list --cloud <CLOUD>`.
30+
31+
```shell
32+
confluent flink quickstart \
33+
--name flink_table_api_tutorials \
34+
--max-cfu 10 \
35+
--region us-east-1 \
36+
--cloud aws \
37+
--table-api-client-config-file ./over-aggregations/flink_table_api_java/src/main/resources/cloud.properties
38+
```
39+
40+
The plugin should complete in under a minute and will generate a properties file as documented [here](https://docs.confluent.io/cloud/current/flink/reference/table-api.html#properties-file).
41+
42+
## Inspect the code
43+
44+
### Dependencies
45+
46+
Before digging into Java source code, first check out the two dependencies required to use the Flink Table API for Confluent Cloud. These are defined in the `dependencies` section of the `over-aggregations/flink_table_api_java/build.gradle` file. (For Maven, see the analogous `pom.xml` snippet [here](https://docs.confluent.io/cloud/current/flink/reference/table-api.html#add-the-table-api-to-an-existing-java-project).)
47+
48+
* `org.apache.flink:flink-table-api-java`: This is the Apache Flink Table API implementation dependency. It contains, for example, the classes implementing the Table API DSL (domain-specific language).
49+
* `io.confluent.flink:confluent-flink-table-api-java-plugin`: This dependency contains the "glue" for instantiating an Apache Flink Table API table environment against Confluent Cloud (i.e., an implementation of the [`org.apache.flink.table.api.TableEnvironment`](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/table/api/TableEnvironment.html) interface), as well as other helper utilities that we will use in this tutorial.
50+
51+
### Java source
52+
53+
Take a look at the source code in `over-aggregations/flink_table_api_java/FlinkTableApiOveraggregations.java`. These two lines instantiate a table environment for executing Table API programs against Confluent Cloud:
54+
55+
```java
56+
EnvironmentSettings envSettings = ConfluentSettings.fromResource("/cloud.properties");
57+
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
58+
```
59+
60+
Let's aggregate one of Confluent Cloud's example tables. You can find these tables in the read-only `marketplace` database of the `examples` catalog. The source code in this example uses the Table API's [`Table.window`](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/table/api/Table.html#window(org.apache.flink.table.api.OverWindow...)) method and the [`Over`](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/table/api/GroupWindowedTable.html) helper class to define the over window. The aggregation is a simple average price of the current row's price along with the previous 5 prices when ordered by the [`$rowtime`](https://docs.confluent.io/cloud/current/flink/reference/statements/create-table.html#flink-sql-system-columns-rowtime) system column.
61+
62+
```java
63+
TableResult tableResult = tableEnv.from("examples.marketplace.orders")
64+
.window(
65+
Over.orderBy($("$rowtime"))
66+
.preceding(rowInterval(5L))
67+
.following(CURRENT_ROW)
68+
.as("window"))
69+
.select(
70+
$("price"),
71+
$("price")
72+
.avg()
73+
.over($("window"))
74+
.round(lit(2))
75+
.as("rolling_avg_price")
76+
)
77+
.execute();
78+
```
79+
80+
Given the table result, we can then materialize (in memory) the rows in the resulting stream by calling [`ConfluentTools.collectMaterialized`](https://docs.confluent.io/cloud/current/flink/reference/table-api.html#confluenttools-collect-materialized-and-confluenttools-print-materialized) or [`ConfluentTools.printMaterialized`](https://docs.confluent.io/cloud/current/flink/reference/table-api.html#confluenttools-collect-materialized-and-confluenttools-print-materialized). This line materializes and prints 10 rows from the table result:
81+
82+
```java
83+
ConfluentTools.printMaterialized(tableResult, 10);
84+
```
85+
86+
Alternatively, we can use the Table API's [`TableResult`](https://docs.confluent.io/cloud/current/flink/reference/functions/table-api-functions.html#tableresult-interface) interface directly to collect rows. For example, to print the next row's over window aggregation:
87+
88+
```java
89+
try (CloseableIterator<Row> it = tableResult.collect()) {
90+
if (it.hasNext()) {
91+
Row row = it.next();
92+
System.out.println(row.getField("price"));
93+
System.out.println(row.getField("rolling_avg_price"));
94+
}
95+
}
96+
```
97+
98+
## Run the program
99+
100+
You can run the example program directly in your IDE by opening the Gradle project located at `over-aggregations/flink_table_api_java/`, or via the command line from the top-level `tutorials` directory:
101+
102+
```shell
103+
./gradlew over-aggregations:flink_table_api_java:run
104+
```
105+
106+
The program will output 10 rows materialized via `printMaterialized`, and then an additional order price and rolling average that includes the previous 5 rows.
107+
108+
```noformat
109+
+-------+-------------------+
110+
| price | rolling_avg_price |
111+
+-------+-------------------+
112+
| 50.91 | 50.91 |
113+
| 20.94 | 35.93 |
114+
| 21.7 | 31.18 |
115+
| 30.42 | 30.99 |
116+
| 75.44 | 39.88 |
117+
| 78.68 | 46.35 |
118+
| 65.96 | 48.86 |
119+
| 85.6 | 59.63 |
120+
| 47.66 | 63.96 |
121+
| 60.38 | 68.95 |
122+
+-------+-------------------+
123+
10 rows in set
124+
82.97
125+
70.21
126+
```
127+
128+
## Tear down Confluent Cloud infrastructure
129+
130+
When you are done, be sure to clean up any Confluent Cloud resources created for this tutorial. Since you created all resources in a Confluent Cloud environment, you can simply delete the environment and most of the resources created for this tutorial (e.g., the Kafka cluster and Flink compute pool) will be deleted. Run the following command in your terminal to get the environment ID of the form `env-123456` corresponding to the environment named `flink_table_api_tutorials_environment`:
131+
132+
```shell
133+
confluent environment list
134+
```
135+
136+
Delete the environment:
137+
138+
```shell
139+
confluent environment delete <ENVIRONMENT_ID>
140+
```
141+
142+
Next, delete the Flink API key. This API key isn't associated with the deleted environment so it needs to be deleted separately. Find the key:
143+
144+
```shell
145+
confluent api-key list --resource flink --current-user
146+
```
147+
148+
And then copy the 16-character alphanumeric key and delete it:
149+
```shell
150+
confluent api-key delete <KEY>
151+
```
152+
153+
Finally, for the sake of housekeeping, delete the Table API client configuration file:
154+
155+
```shell
156+
rm over-aggregations/flink_table_api_java/src/main/resources/cloud.properties
157+
```
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
}
6+
7+
plugins {
8+
id 'java'
9+
id 'application'
10+
}
11+
12+
java {
13+
sourceCompatibility = JavaVersion.VERSION_21
14+
targetCompatibility = JavaVersion.VERSION_21
15+
}
16+
17+
application {
18+
mainClass = "io.confluent.developer.FlinkTableApiOverAggregations"
19+
}
20+
21+
repositories {
22+
mavenCentral()
23+
maven { url 'https://packages.confluent.io/maven/' }
24+
}
25+
26+
dependencies {
27+
implementation 'org.apache.flink:flink-table-api-java:1.20.1'
28+
implementation 'io.confluent.flink:confluent-flink-table-api-java-plugin:1.20-50'
29+
implementation 'org.slf4j:slf4j-api:2.0.17'
30+
implementation 'org.slf4j:slf4j-simple:2.0.17'
31+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rootProject.name = 'over-aggregations'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.confluent.developer;
2+
3+
import org.apache.flink.table.api.EnvironmentSettings;
4+
import org.apache.flink.table.api.Over;
5+
import org.apache.flink.table.api.TableEnvironment;
6+
import org.apache.flink.table.api.TableResult;
7+
import org.apache.flink.types.Row;
8+
import org.apache.flink.util.CloseableIterator;
9+
10+
import io.confluent.flink.plugin.ConfluentSettings;
11+
import io.confluent.flink.plugin.ConfluentTools;
12+
13+
import static org.apache.flink.table.api.Expressions.$;
14+
import static org.apache.flink.table.api.Expressions.CURRENT_ROW;
15+
import static org.apache.flink.table.api.Expressions.lit;
16+
import static org.apache.flink.table.api.Expressions.rowInterval;
17+
18+
public class FlinkTableApiOverAggregations {
19+
20+
public static void main(String[] args) throws Exception {
21+
EnvironmentSettings envSettings = ConfluentSettings.fromResource("/cloud.properties");
22+
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
23+
24+
tableEnv.useCatalog("examples");
25+
tableEnv.useDatabase("marketplace");
26+
TableResult tableResult = tableEnv.from("examples.marketplace.orders")
27+
.window(
28+
Over.orderBy($("$rowtime"))
29+
.preceding(rowInterval(5L))
30+
.following(CURRENT_ROW)
31+
.as("window"))
32+
.select(
33+
$("price"),
34+
$("price")
35+
.avg()
36+
.over($("window"))
37+
.round(lit(2))
38+
.as("rolling_avg_price")
39+
)
40+
.execute();
41+
42+
43+
// option 1: use ConfluentTools.printMaterialized or ConfluentTools.collectMaterialized
44+
ConfluentTools.printMaterialized(tableResult, 10);
45+
46+
// option 2: use TableResult.collect
47+
try (CloseableIterator<Row> it = tableResult.collect()) {
48+
if (it.hasNext()) {
49+
Row row = it.next();
50+
System.out.println(row.getField("price"));
51+
System.out.println(row.getField("rolling_avg_price"));
52+
}
53+
}
54+
}
55+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
client.cloud=
2+
client.region=
3+
client.flink-api-key=
4+
client.flink-api-secret=
5+
client.organization-id=
6+
client.environment-id=
7+
client.compute-pool-id=
8+
client.principal-id=

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ include 'multi-joins:flinksql'
4848
include 'multiple-event-types-avro:kafka'
4949
include 'multiple-event-types-protobuf:kafka'
5050
include 'naming-changelog-repartition-topics:kstreams'
51+
include 'over-aggregations:flink_table_api_java'
5152
include 'over-aggregations:flinksql'
5253
include 'pattern-matching:flinksql'
5354
include 'reordering-streams:kstreams'

tumbling-windows/flink_table_api_java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repositories {
2424
}
2525

2626
dependencies {
27-
implementation 'org.apache.flink:flink-table-api-java:1.20.0'
27+
implementation 'org.apache.flink:flink-table-api-java:1.20.1'
2828
implementation 'io.confluent.flink:confluent-flink-table-api-java-plugin:1.20-50'
2929
implementation 'org.slf4j:slf4j-api:2.0.17'
3030
implementation 'org.slf4j:slf4j-simple:2.0.17'

0 commit comments

Comments
 (0)