Skip to content

sami12rom/kafka-flink-demo

Repository files navigation

Elevating Analytics with Real-Time Data Streaming: A Deep Dive into Kafka

Azure Terraform Docker Power Bi Python

Apache Kafka Apache Flink Pyspark Databricks

Welcome to our repository, crafted to highlight the seamless integration and ease of use of Kafka. Dive into:

  1. Set up Confluent Cloud, Connectors & Flink.

Created by

This repo has been created by:

# Name Contact
1 Sami Alashabi
2 Maria Berinde-Tampanariu
3 Ramzi Alashabi

Design

The Demo will be using Flink Streaming: infrastructure

Another possibility can be using Spark Streaming and Databricks, the code can be found kafka_spark_streaming folder: infrastructure

Installation

Installation needs to be do once

Pre-requisites

Install Terraform

brew tap hashicorp/tap
brew install hashicorp/tap/terraform
brew update
brew upgrade hashicorp/tap/terraform

Install jq

brew install jq

Set up services for the demo

Set environment variables

  • Create file .env inside the confluent_cloud_demo folder
#!/bin/bash

# Confluent Platform
export CONFLUENT_CLOUD_API_KEY="Enter credentials here"
export CONFLUENT_CLOUD_API_SECRET="Enter credentials here"

export TF_VAR_PBI_API_URL=''
export TF_VAR_TOPIC_2_PBI=''

Start Demo

  • Run command: ./demo_start.sh inside the confluent_cloud_demo folder
  • Access Confluent Cloud: https://confluent.cloud/login
  • Select your Environment
  • Select tab Flink (preview)
  • Access your Flink Compute Pool
  • Click Open SQL Workspace

Flink Compute Pool

  • Select Catalog: kafka_flink_demo_xx
  • Select Database: cc-demo-cluster

  • Proceed to submit the below SQL queries (one at each tab):
---------------------------------------------------------------
-- Create table users (A topic with same name will be created)
---------------------------------------------------------------
CREATE TABLE `users` (
  `userid` INT,
  `fullname` STRING,
  `credit_card_last_four_digits` STRING,
  `gender` STRING,
  `email` STRING,  
  `ipaddress` STRING,
  `company` STRING,  
  `avg_credit_spend` DOUBLE
) WITH (
  'changelog.mode' = 'retract'
);

describe extended `users`;

--------------------------------------------------------------------------
-- Populate table users (You will see new messages published in the topic)
--------------------------------------------------------------------------
INSERT INTO `users` (`userid`, `fullname`,`credit_card_last_four_digits`, `gender`, `email`, `ipaddress`, `company`, `avg_credit_spend`) VALUES
(1, 'Ellen Ripley', '1234', 'Male', '[email protected]', '72.197.144.165', 'Dynabox', 2650.0),
(2, 'Tony Stark', '2345', 'Genderfluid', '[email protected]', '13.246.111.16', 'Aivee', 4119.27),
(3, 'Vito Corleone', '3456', 'Male', '[email protected]', '197.231.118.1', 'Fanoodle', 2119.76),
(4, 'Lara Croft', '4567', 'Female', '[email protected]', '138.246.248.76', 'Yodo', 1271.58),
(5, 'Elisabeth Gentry', '5678', 'Female', '[email protected]', '236.176.123.77', 'Skaboo', 2783.47),
(6, 'Richart Bradfield', '6789', 'Male', '[email protected]', '71.180.87.61', 'Meejo', 2154.45),
(7, 'Helene Hargrove', '7890', 'Female', '[email protected]', '240.88.89.167', 'Browsebug', 2333.36),
(8, 'Benji Geck', '8901', 'Male', '[email protected]', '250.2.253.193', 'Yombu', 3999.74),
(9, 'Jack Sparrow', '9012', 'Non-binary', '[email protected]', '185.20.56.89', 'Einti', 3817.99),
(10, 'Elyn Cromarty', '0123', 'Female', '[email protected]', '167.68.56.180', 'Shufflester', 5263.34),
(11, 'Hurley Cochrane', '1111', 'Male', '[email protected]', '241.69.23.160', 'LiveZ', 4935.66),
(12, 'Marty McFly', '2876', 'Female', '[email protected]', '36.208.43.205', 'Blognation', 2796.26),
(13, 'John Wick', '3211', 'Male', '[email protected]', '4.232.220.231', 'Realcube', 4050.23),
(14, 'Maximus Decimus Meridius', '4721', 'Female', '[email protected]', '65.87.4.235', 'Vitz', 1637.76),
(15, 'Severus Snape', '5005', 'Female', '[email protected]', '7.26.91.164', 'Youopia', 5038.34),
(16, 'Jerrold Strugnell', '1616', 'Male', '[email protected]', '15.38.20.244', 'Devpoint', 2947.57);

select * from `users` LIMIT 16;

----------------------------------------------------------------------------
-- Create table credit-card-enriched (topic with same name will be created)
----------------------------------------------------------------------------
CREATE TABLE `credit-card-enriched` (
  `userid` INT,
  `credit_card_last_four_digits` STRING,
  `fullname` STRING,
  `gender` STRING,
  `email` STRING,  
  `ipaddress` STRING,
  `company` STRING,  
  `avg_credit_spend` DOUBLE,
  `amount` DOUBLE,
  `transaction_id` BIGINT,
  `timestamp` TIMESTAMP(0),
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' MINUTES
) WITH (
  'changelog.mode' = 'retract'
);

describe extended `credit-card-enriched`;

----------------------------------------------------------------------------------
-- Merge tables poc-credit-card-transactions and users (non-transactional) 
----------------------------------------------------------------------------------
INSERT INTO `credit-card-enriched` (`userid`, `credit_card_last_four_digits`, `fullname`, `gender`, `email`, `ipaddress`, `company`, `amount`, `avg_credit_spend`, `transaction_id`, `timestamp`)
SELECT
  u.`userid`,
  c.`credit_card_last_four_digits`,
  u.`fullname`,
  u.`gender`,
  u.`email`,
  u.`ipaddress`,
  u.`company`,
  c.`amount`,
  u.`avg_credit_spend`,
  c.`transaction_id`,
  c.`timestamp`
FROM
  `poc-credit-card-transactions` as c
LEFT JOIN `users` AS u
ON
  c.`credit_card_last_four_digits` = u.`credit_card_last_four_digits`;

select * from `credit-card-enriched`;


------------------------------------------------------------------------
-- Create table possible-fraud (topic with same name will be created)
------------------------------------------------------------------------
CREATE TABLE `possible-fraud` (
  `userid` INT,
  `credit_card_last_four_digits` STRING,
  `fullname` STRING,
  `gender` STRING,
  `email` STRING, 
  `timestamp` TIMESTAMP(0),
  `sum_amount` DOUBLE,
  `max_avg_credit_spend` DOUBLE,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' MINUTES
) WITH (
  'changelog.mode' = 'retract'
);

describe extended `possible-fraud`;

-------------------------------------------------------------------------------------------------
-- Populate table possible-fraud (If sum of amount if greater than average credit card spend)
-------------------------------------------------------------------------------------------------
INSERT INTO `possible-fraud`
SELECT
  `userid`,
  `credit_card_last_four_digits`,
  `fullname`,
  `gender`,
  `email`,
  `window_start`,
   SUM(`amount`),
   MAX(`avg_credit_spend`)
FROM
  TABLE(
    TUMBLE(TABLE `credit-card-enriched`, DESCRIPTOR(`timestamp`), INTERVAL '30' SECONDS)
  )
GROUP BY `credit_card_last_four_digits`, `userid`, `fullname`, `gender`,`email`, `window_start`
HAVING
  SUM(`amount`) > MAX(`avg_credit_spend`);

select * from `possible-fraud`;

Review Running Flink SQL statements

  • Access your Environment: kafka_flink_demo-xx
  • Select tab Flink (preview)
  • Select tab Flink statements
  • Filter by Status Running (see example below)

Stop Demo

  • Run command: ./demo_stop.sh

Terraform Documentation

Requirements

Name Version
azurerm =3.0.0
confluent 1.55.0
random ~>3.0

Providers

Name Version
azurerm 3.0.0
confluent 1.55.0
random 3.5.1

Resources

Name Type
azurerm_dev_test_global_vm_shutdown_schedule.myschedule resource
azurerm_network_interface.my_terraform_nic resource
azurerm_network_interface_security_group_association.example resource
azurerm_network_security_group.my_terraform_nsg resource
azurerm_public_ip.my_terraform_public_ip resource
azurerm_storage_account.mystorage resource
azurerm_storage_container.example resource
azurerm_subnet.my_terraform_subnet resource
azurerm_virtual_network.my_terraform_network resource
azurerm_windows_virtual_machine.main resource
confluent_api_key.app_manager_kafka_cluster_key resource
confluent_api_key.clients_kafka_cluster_key resource
confluent_api_key.sr_cluster_key resource
confluent_custom_connector_plugin.sink resource
confluent_environment.cc_demo_env resource
confluent_flink_compute_pool.cc_flink_compute_pool resource
confluent_kafka_cluster.cc_kafka_cluster resource
confluent_kafka_topic.credit_card resource
confluent_kafka_topic.pageviews resource
confluent_role_binding.app_manager_environment_admin resource
confluent_role_binding.clients_cluster_admin resource
confluent_role_binding.demo-rb resource
confluent_role_binding.sr_environment_admin resource
confluent_schema_registry_cluster.cc_sr_cluster resource
confluent_service_account.app_manager resource
confluent_service_account.clients resource
confluent_service_account.connectors resource
confluent_service_account.demo-sa resource
confluent_service_account.sr resource
confluent_tag.pii resource
random_id.id resource
random_pet.prefix resource
azurerm_resource_group.demo data source
confluent_schema_registry_region.cc_demo_sr data source

Inputs

Name Description Type Default Required
admin_password n/a any n/a yes
admin_username -------------------------------------------- Azure -------------------------------------------- any n/a yes
cc_availability n/a string "SINGLE_ZONE" no
cc_cloud_provider ---------------------------------------- Confluent Cloud Kafka cluster variables ---------------------------------------- string "AWS" no
cc_cloud_region n/a string "eu-central-1" no
cc_cluster_name n/a string "cc_demo_cluster" no
cc_compute_pool_cfu n/a number 5 no
cc_compute_pool_name n/a string "cc_demo_flink" no
cc_dislay_name -------------------------------------------- Confluent Cloud Flink Compute Pool variables -------------------------------------------- string "standard_compute_pool" no
cc_env_name n/a string "kafka_flink_demo" no
sr_cloud_provider ------------------------------------------ Confluent Cloud Schema Registry variables ------------------------------------------ string "AWS" no
sr_cloud_region n/a string "eu-central-1" no
sr_package n/a string "ESSENTIALS" no

Outputs

Name Description
cc_demo_env CC Environment
cc_demo_sa CC Service Account
cc_demo_sr CC Schema Registry Region
cc_kafka_cluster CC Kafka Cluster ID
cc_sr_cluster CC SR Cluster ID
id n/a

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •