Skip to content

Commit 429e83a

Browse files
committed
address configuration changes needed in lamba-datastore
1 parent d24ebf7 commit 429e83a

File tree

4 files changed

+14
-3
lines changed

4 files changed

+14
-3
lines changed

geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStore.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,5 +218,6 @@ object LambdaDataStore {
218218
consumers: Int,
219219
expiry: Option[FiniteDuration],
220220
persistBatchSize: Option[Int] = None,
221+
offsetCommitIntervalMs: Long = 10000
221222
)
222223
}

geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStoreParams.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams
104104
"Offset manager instance to use",
105105
deprecatedKeys = Seq("lamdab.offset-manager", "offsetManager"))
106106

107+
val ConsumerOffsetCommitIntervalMs =
108+
new GeoMesaParam[java.lang.Long](
109+
"lambda.kafka.consumer.offset-commit-interval-ms",
110+
"The frequency of committing offsets for the Kafka consumer",
111+
default = 10000
112+
)
113+
107114
def parse(params: java.util.Map[String, _], namespace: String): LambdaConfig = {
108115
val brokers = BrokersParam.lookup(params)
109116
val expiry = if (!PersistParam.lookup(params).booleanValue) { None } else {
@@ -121,6 +128,8 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams
121128
val zk = ZookeepersParam.lookup(params)
122129
val zkNamespace = s"gm_lambda_$namespace"
123130

124-
LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize)
131+
val offsetCommitIntervalMs : Long = ConsumerOffsetCommitIntervalMs.lookupOpt(params).map(_.toLong).getOrElse(10000)
132+
133+
LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize, offsetCommitIntervalMs)
125134
}
126135
}

geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaCacheLoader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class KafkaCacheLoader(
3333
topic: String,
3434
frequency: Long,
3535
serializer: KryoFeatureSerializer,
36-
cache: WritableFeatureCache) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) {
36+
cache: WritableFeatureCache,
37+
offsetCommitIntervalMs: Long) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) {
3738

3839
startConsumers()
3940

geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class KafkaStore(
7070
private val loader = {
7171
val consumers = KafkaStore.consumers(config.consumerConfig, topic, offsetManager, config.consumers, cache.partitionAssigned)
7272
val frequency = KafkaStore.LoadIntervalProperty.toDuration.get.toMillis
73-
new KafkaCacheLoader(consumers, topic, frequency, serializer, cache)
73+
new KafkaCacheLoader(consumers, topic, frequency, serializer, cache, config.offsetCommitIntervalMs)
7474
}
7575

7676
override def createSchema(): Unit = {

0 commit comments

Comments
 (0)