Welcome to our repository, crafted to highlight the seamless integration and ease of use of Kafka. Dive into:
- Set up Confluent Cloud, Connectors & Flink.
This repo has been created by:
# | Name | Contact |
---|---|---|
1 | Sami Alashabi | |
2 | Maria Berinde-Tampanariu | |
3 | Ramzi Alashabi |
The Demo will be using Flink Streaming:
Another possibility can be using Spark Streaming and Databricks, the code can be found kafka_spark_streaming folder:
Installation needs to be do once
- User account on Confluent Cloud
- Local install of Terraform (details below)
- Local install of jq (details below)
brew tap hashicorp/tap
brew install hashicorp/tap/terraform
brew update
brew upgrade hashicorp/tap/terraform
brew install jq
- Create file
.env
inside the confluent_cloud_demo folder
#!/bin/bash
# Confluent Platform
export CONFLUENT_CLOUD_API_KEY="Enter credentials here"
export CONFLUENT_CLOUD_API_SECRET="Enter credentials here"
export TF_VAR_PBI_API_URL=''
export TF_VAR_TOPIC_2_PBI=''
- Run command:
./demo_start.sh
inside the confluent_cloud_demo folder - Access Confluent Cloud: https://confluent.cloud/login
- Select your Environment
- Select tab
Flink (preview)
- Access your Flink Compute Pool
- Click
Open SQL Workspace
- Select Catalog:
kafka_flink_demo_xx
- Select Database:
cc-demo-cluster
- Proceed to submit the below SQL queries (one at each tab):
---------------------------------------------------------------
-- Create table users (A topic with same name will be created)
---------------------------------------------------------------
CREATE TABLE `users` (
`userid` INT,
`fullname` STRING,
`credit_card_last_four_digits` STRING,
`gender` STRING,
`email` STRING,
`ipaddress` STRING,
`company` STRING,
`avg_credit_spend` DOUBLE
) WITH (
'changelog.mode' = 'retract'
);
describe extended `users`;
--------------------------------------------------------------------------
-- Populate table users (You will see new messages published in the topic)
--------------------------------------------------------------------------
INSERT INTO `users` (`userid`, `fullname`,`credit_card_last_four_digits`, `gender`, `email`, `ipaddress`, `company`, `avg_credit_spend`) VALUES
(1, 'Ellen Ripley', '1234', 'Male', '[email protected]', '72.197.144.165', 'Dynabox', 2650.0),
(2, 'Tony Stark', '2345', 'Genderfluid', '[email protected]', '13.246.111.16', 'Aivee', 4119.27),
(3, 'Vito Corleone', '3456', 'Male', '[email protected]', '197.231.118.1', 'Fanoodle', 2119.76),
(4, 'Lara Croft', '4567', 'Female', '[email protected]', '138.246.248.76', 'Yodo', 1271.58),
(5, 'Elisabeth Gentry', '5678', 'Female', '[email protected]', '236.176.123.77', 'Skaboo', 2783.47),
(6, 'Richart Bradfield', '6789', 'Male', '[email protected]', '71.180.87.61', 'Meejo', 2154.45),
(7, 'Helene Hargrove', '7890', 'Female', '[email protected]', '240.88.89.167', 'Browsebug', 2333.36),
(8, 'Benji Geck', '8901', 'Male', '[email protected]', '250.2.253.193', 'Yombu', 3999.74),
(9, 'Jack Sparrow', '9012', 'Non-binary', '[email protected]', '185.20.56.89', 'Einti', 3817.99),
(10, 'Elyn Cromarty', '0123', 'Female', '[email protected]', '167.68.56.180', 'Shufflester', 5263.34),
(11, 'Hurley Cochrane', '1111', 'Male', '[email protected]', '241.69.23.160', 'LiveZ', 4935.66),
(12, 'Marty McFly', '2876', 'Female', '[email protected]', '36.208.43.205', 'Blognation', 2796.26),
(13, 'John Wick', '3211', 'Male', '[email protected]', '4.232.220.231', 'Realcube', 4050.23),
(14, 'Maximus Decimus Meridius', '4721', 'Female', '[email protected]', '65.87.4.235', 'Vitz', 1637.76),
(15, 'Severus Snape', '5005', 'Female', '[email protected]', '7.26.91.164', 'Youopia', 5038.34),
(16, 'Jerrold Strugnell', '1616', 'Male', '[email protected]', '15.38.20.244', 'Devpoint', 2947.57);
select * from `users` LIMIT 16;
----------------------------------------------------------------------------
-- Create table credit-card-enriched (topic with same name will be created)
----------------------------------------------------------------------------
CREATE TABLE `credit-card-enriched` (
`userid` INT,
`credit_card_last_four_digits` STRING,
`fullname` STRING,
`gender` STRING,
`email` STRING,
`ipaddress` STRING,
`company` STRING,
`avg_credit_spend` DOUBLE,
`amount` DOUBLE,
`transaction_id` BIGINT,
`timestamp` TIMESTAMP(0),
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' MINUTES
) WITH (
'changelog.mode' = 'retract'
);
describe extended `credit-card-enriched`;
----------------------------------------------------------------------------------
-- Merge tables poc-credit-card-transactions and users (non-transactional)
----------------------------------------------------------------------------------
INSERT INTO `credit-card-enriched` (`userid`, `credit_card_last_four_digits`, `fullname`, `gender`, `email`, `ipaddress`, `company`, `amount`, `avg_credit_spend`, `transaction_id`, `timestamp`)
SELECT
u.`userid`,
c.`credit_card_last_four_digits`,
u.`fullname`,
u.`gender`,
u.`email`,
u.`ipaddress`,
u.`company`,
c.`amount`,
u.`avg_credit_spend`,
c.`transaction_id`,
c.`timestamp`
FROM
`poc-credit-card-transactions` as c
LEFT JOIN `users` AS u
ON
c.`credit_card_last_four_digits` = u.`credit_card_last_four_digits`;
select * from `credit-card-enriched`;
------------------------------------------------------------------------
-- Create table possible-fraud (topic with same name will be created)
------------------------------------------------------------------------
CREATE TABLE `possible-fraud` (
`userid` INT,
`credit_card_last_four_digits` STRING,
`fullname` STRING,
`gender` STRING,
`email` STRING,
`timestamp` TIMESTAMP(0),
`sum_amount` DOUBLE,
`max_avg_credit_spend` DOUBLE,
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' MINUTES
) WITH (
'changelog.mode' = 'retract'
);
describe extended `possible-fraud`;
-------------------------------------------------------------------------------------------------
-- Populate table possible-fraud (If sum of amount if greater than average credit card spend)
-------------------------------------------------------------------------------------------------
INSERT INTO `possible-fraud`
SELECT
`userid`,
`credit_card_last_four_digits`,
`fullname`,
`gender`,
`email`,
`window_start`,
SUM(`amount`),
MAX(`avg_credit_spend`)
FROM
TABLE(
TUMBLE(TABLE `credit-card-enriched`, DESCRIPTOR(`timestamp`), INTERVAL '30' SECONDS)
)
GROUP BY `credit_card_last_four_digits`, `userid`, `fullname`, `gender`,`email`, `window_start`
HAVING
SUM(`amount`) > MAX(`avg_credit_spend`);
select * from `possible-fraud`;
- Access your Environment:
kafka_flink_demo-xx
- Select tab
Flink (preview)
- Select tab
Flink statements
- Filter by Status
Running
(see example below)
- Run command:
./demo_stop.sh
Name | Version |
---|---|
azurerm | =3.0.0 |
confluent | 1.55.0 |
random | ~>3.0 |
Name | Version |
---|---|
azurerm | 3.0.0 |
confluent | 1.55.0 |
random | 3.5.1 |
Name | Description | Type | Default | Required |
---|---|---|---|---|
admin_password | n/a | any |
n/a | yes |
admin_username | -------------------------------------------- Azure -------------------------------------------- | any |
n/a | yes |
cc_availability | n/a | string |
"SINGLE_ZONE" |
no |
cc_cloud_provider | ---------------------------------------- Confluent Cloud Kafka cluster variables ---------------------------------------- | string |
"AWS" |
no |
cc_cloud_region | n/a | string |
"eu-central-1" |
no |
cc_cluster_name | n/a | string |
"cc_demo_cluster" |
no |
cc_compute_pool_cfu | n/a | number |
5 |
no |
cc_compute_pool_name | n/a | string |
"cc_demo_flink" |
no |
cc_dislay_name | -------------------------------------------- Confluent Cloud Flink Compute Pool variables -------------------------------------------- | string |
"standard_compute_pool" |
no |
cc_env_name | n/a | string |
"kafka_flink_demo" |
no |
sr_cloud_provider | ------------------------------------------ Confluent Cloud Schema Registry variables ------------------------------------------ | string |
"AWS" |
no |
sr_cloud_region | n/a | string |
"eu-central-1" |
no |
sr_package | n/a | string |
"ESSENTIALS" |
no |
Name | Description |
---|---|
cc_demo_env | CC Environment |
cc_demo_sa | CC Service Account |
cc_demo_sr | CC Schema Registry Region |
cc_kafka_cluster | CC Kafka Cluster ID |
cc_sr_cluster | CC SR Cluster ID |
id | n/a |