Skip to content

Satyajitv/KafkaStreamingDF

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 

Repository files navigation

Structured Streaming(Kafka) JSON to DF (Specifically for Debezium payload.after Data) +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

The same should also work with other JSON formats apart from Debezium, with minute changes.

Approach:

+++++++++

*InferSchema from kafka batch process version.

*Use the schema to create JSON row values to Streaming DF with columns and their values.

Steps to create DF from structured streaming JSON values:

1.Create an object like SparkEntry.

2.Create SparkSession instance.

3.Create a Map with the options that are required to connect to Kafka, example

      val options = new scala.collection.mutable.HashMap[String, String]
      options("kafka.bootstrap.servers") = "**.**.**.**:9092"
      options("subscribe") = "***************"
      options("maxOffsetsPerTrigger") = "10"
      options("startingOffsets") = "earliest"
      options("checkpointLocation") = "*****************************"

4.Pass SparkSession and options to StructStreaming.createRowDF(spark,options) method,which return's DataFrame with all fields in JSON as Columns in DF.

StructStreaming.createRowDF(spark,options).writeStream.format("console").start().awaitTermination()

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages