Skip to content

Commit 11dc185

Browse files
author
Steven Wu
committed
switch from ConcurrentSkipListSet to CopyOnWriteArraySet
1 parent c6db346 commit 11dc185

File tree

1 file changed

+12
-3
lines changed
  • suro-kafka-producer/src/main/java/com/netflix/suro/sink/kafka

1 file changed

+12
-3
lines changed

suro-kafka-producer/src/main/java/com/netflix/suro/sink/kafka/KafkaSink.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
import com.netflix.suro.TagKey;
1414
import com.netflix.suro.message.MessageContainer;
1515
import com.netflix.suro.sink.Sink;
16-
import org.apache.kafka.clients.producer.*;
16+
import org.apache.kafka.clients.producer.Callback;
17+
import org.apache.kafka.clients.producer.KafkaProducer;
18+
import org.apache.kafka.clients.producer.ProducerConfig;
19+
import org.apache.kafka.clients.producer.ProducerRecord;
20+
import org.apache.kafka.clients.producer.RecordMetadata;
1721
import org.apache.kafka.common.Metric;
1822
import org.apache.kafka.common.config.ConfigDef;
1923
import org.slf4j.Logger;
@@ -28,7 +32,12 @@
2832
import java.lang.reflect.Field;
2933
import java.util.Map;
3034
import java.util.Properties;
31-
import java.util.concurrent.*;
35+
import java.util.Set;
36+
import java.util.concurrent.CopyOnWriteArraySet;
37+
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.Semaphore;
40+
import java.util.concurrent.TimeUnit;
3241
import java.util.concurrent.atomic.AtomicLong;
3342

3443
/**
@@ -128,7 +137,7 @@ public void setRecordCounterListener(Action3 action) {
128137
this.recordCounterListener = action;
129138
}
130139

131-
private ConcurrentSkipListSet<String> metadataFetchedTopicSet = new ConcurrentSkipListSet<>();
140+
private Set<String> metadataFetchedTopicSet = new CopyOnWriteArraySet<String>();
132141
private PublishSubject<MessageContainer> stream = PublishSubject.create();
133142
private Subscription subscription;
134143
private ExecutorService executor = Executors.newSingleThreadExecutor(

0 commit comments

Comments
 (0)