Skip to content

Commit 5260dee

Browse files
committed
exactly once with transactional writes
1 parent 570e1f2 commit 5260dee

File tree

1 file changed

+57
-19
lines changed

1 file changed

+57
-19
lines changed

src/main/scala/ExactlyOnce.scala

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,52 +3,90 @@ import java.time.LocalDateTime
33
import java.time.format.DateTimeParseException
44
import java.time.temporal.ChronoUnit
55
import org.apache.spark._
6+
import org.apache.spark.rdd.RDD
67
import org.apache.spark.streaming._
78
import org.apache.spark.streaming.kafka010._
89
import org.apache.kafka.common.serialization.StringDeserializer
10+
import org.apache.kafka.common.TopicPartition
11+
import org.apache.kafka.clients.consumer.ConsumerRecord
912
import scalikejdbc._
1013

1114

1215
object ExactlyOnce {
1316
def main(args: Array[String]): Unit = {
17+
val brokers = "localhost:9092"
18+
val topic = "alog"
19+
1420
val kafkaParams = Map[String, Object](
15-
"bootstrap.servers" -> "localhost:9092",
21+
"bootstrap.servers" -> brokers,
1622
"key.deserializer" -> classOf[StringDeserializer],
1723
"value.deserializer" -> classOf[StringDeserializer],
1824
"group.id" -> "exactly-once",
19-
"auto.offset.reset" -> "latest")
25+
"enable.auto.commit" -> (false: java.lang.Boolean),
26+
"auto.offset.reset" -> "none")
27+
28+
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")
2029

2130
val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")
2231
val ssc = new StreamingContext(conf, Seconds(5))
2332

33+
val fromOffsets = DB.readOnly { implicit session =>
34+
sql"""
35+
select `partition`, offset from kafka_offset
36+
where topic = ${topic}
37+
""".map { rs =>
38+
new TopicPartition(topic, rs.int("partition")) -> rs.long("offset")
39+
}.list.apply().toMap
40+
}
41+
2442
val messages = KafkaUtils.createDirectStream[String, String](ssc,
2543
LocationStrategies.PreferConsistent,
26-
ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))
44+
ConsumerStrategies.Assign[String, String](fromOffsets.keys, kafkaParams, fromOffsets))
2745

28-
messages.map(_.value)
29-
.flatMap(parseLog)
30-
.filter(_.level == "ERROR")
31-
.map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
32-
.reduceByKey(_ + _)
33-
.foreachRDD { (rdd, time) =>
34-
rdd.foreachPartition { iter =>
35-
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")
36-
iter.foreach { case (time, count) =>
37-
DB.autoCommit { implicit session =>
38-
sql"""
39-
insert into error_log (log_time, log_count)
40-
value (${time}, ${count})
41-
on duplicate key update log_count = log_count + values(log_count)
42-
""".update.apply()
43-
}
46+
messages.foreachRDD { rdd =>
47+
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
48+
val result = processLogs(rdd).collect()
49+
50+
DB.localTx { implicit session =>
51+
result.foreach { case (time, count) =>
52+
sql"""
53+
insert into error_log (log_time, log_count)
54+
value (${time}, ${count})
55+
on duplicate key update log_count = log_count + values(log_count)
56+
""".update.apply()
57+
}
58+
59+
offsetRanges.foreach { offsetRange =>
60+
sql"""
61+
insert ignore into kafka_offset (topic, `partition`, offset)
62+
value (${topic}, ${offsetRange.partition}, ${offsetRange.fromOffset})
63+
""".update.apply()
64+
65+
val affectedRows = sql"""
66+
update kafka_offset set offset = ${offsetRange.untilOffset}
67+
where topic = ${topic} and `partition` = ${offsetRange.partition}
68+
and offset = ${offsetRange.fromOffset}
69+
""".update.apply()
70+
71+
if (affectedRows != 1) {
72+
throw new Exception("fail to update offset")
4473
}
4574
}
4675
}
76+
}
4777

4878
ssc.start()
4979
ssc.awaitTermination()
5080
}
5181

82+
def processLogs(messages: RDD[ConsumerRecord[String, String]]): RDD[(LocalDateTime, Int)] = {
83+
messages.map(_.value)
84+
.flatMap(parseLog)
85+
.filter(_.level == "ERROR")
86+
.map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
87+
.reduceByKey(_ + _)
88+
}
89+
5290
case class Log(time: LocalDateTime, level: String)
5391

5492
val logPattern = "^(.{19}) ([A-Z]+).*".r

0 commit comments

Comments
 (0)