Skip to content

Commit 05653cb

Browse files
committed
move cleaning filter cache on closed readers to separate thread
improve cleaning the global weighted cache when a reader closes, move it to a separate thread, so iterating over the cache entries will nto happen on each segment closed, but instead be "bulked"
1 parent 9dd8e1e commit 05653cb

File tree

3 files changed

+75
-18
lines changed

3 files changed

+75
-18
lines changed

src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,7 @@ public void clear(String reason) {
8383
return;
8484
}
8585
seenReadersCount.dec();
86-
for (FilterCacheKey key : indicesFilterCache.cache().asMap().keySet()) {
87-
if (key.readerKey() == readerKey) {
88-
// invalidate will cause a removal and will be notified
89-
indicesFilterCache.cache().invalidate(key);
90-
}
91-
}
86+
indicesFilterCache.addReaderKeyToClean(readerKey);
9287
}
9388
}
9489

@@ -106,13 +101,7 @@ public void clear(IndexReader reader) {
106101
return;
107102
}
108103
seenReadersCount.dec();
109-
Cache<FilterCacheKey, DocSet> cache = indicesFilterCache.cache();
110-
for (FilterCacheKey key : cache.asMap().keySet()) {
111-
if (key.readerKey() == reader.getCoreCacheKey()) {
112-
// invalidate will cause a removal and will be notified
113-
cache.invalidate(key);
114-
}
115-
}
104+
indicesFilterCache.addReaderKeyToClean(reader.getCoreCacheKey());
116105
}
117106

118107
@Override
@@ -247,7 +236,7 @@ public boolean equals(Object o) {
247236
if (this == o) return true;
248237
// if (o == null || getClass() != o.getClass()) return false;
249238
FilterCacheKey that = (FilterCacheKey) o;
250-
return (readerKey == that.readerKey && filterKey.equals(that.filterKey));
239+
return (readerKey.equals(that.readerKey) && filterKey.equals(that.filterKey));
251240
}
252241

253242
@Override

src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ public void onRefreshSettings(Settings settings) {
719719
}
720720
}
721721

722-
private class EngineRefresher implements Runnable {
722+
class EngineRefresher implements Runnable {
723723
@Override
724724
public void run() {
725725
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
@@ -767,7 +767,7 @@ public void run() {
767767
}
768768
}
769769

770-
private class EngineMerger implements Runnable {
770+
class EngineMerger implements Runnable {
771771
@Override
772772
public void run() {
773773
if (!engine().possibleMergeNeeded()) {

src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,43 @@
2525
import com.google.common.cache.RemovalListener;
2626
import com.google.common.cache.RemovalNotification;
2727
import com.google.common.collect.ImmutableMap;
28+
import gnu.trove.set.hash.THashSet;
2829
import org.elasticsearch.cluster.metadata.MetaData;
30+
import org.elasticsearch.common.CacheRecycler;
2931
import org.elasticsearch.common.collect.MapBuilder;
3032
import org.elasticsearch.common.component.AbstractComponent;
3133
import org.elasticsearch.common.inject.Inject;
3234
import org.elasticsearch.common.lucene.docset.DocSet;
3335
import org.elasticsearch.common.settings.Settings;
3436
import org.elasticsearch.common.unit.ByteSizeValue;
3537
import org.elasticsearch.common.unit.TimeValue;
38+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3639
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
3740
import org.elasticsearch.monitor.jvm.JvmInfo;
3841
import org.elasticsearch.node.settings.NodeSettingsService;
42+
import org.elasticsearch.threadpool.ThreadPool;
3943

44+
import java.util.Iterator;
4045
import java.util.Map;
46+
import java.util.Set;
4147
import java.util.concurrent.TimeUnit;
4248

4349
public class IndicesFilterCache extends AbstractComponent implements RemovalListener<WeightedFilterCache.FilterCacheKey, DocSet> {
4450

51+
private final ThreadPool threadPool;
52+
4553
private Cache<WeightedFilterCache.FilterCacheKey, DocSet> cache;
4654

4755
private volatile String size;
4856
private volatile long sizeInBytes;
4957
private volatile TimeValue expire;
5058

59+
private final TimeValue cleanInterval;
60+
61+
private final Set<Object> readersKeysToClean = ConcurrentCollections.newConcurrentSet();
62+
63+
private volatile boolean closed;
64+
5165
private volatile Map<String, RemovalListener<WeightedFilterCache.FilterCacheKey, DocSet>> removalListeners =
5266
ImmutableMap.of();
5367

@@ -85,15 +99,19 @@ public void onRefreshSettings(Settings settings) {
8599
}
86100

87101
@Inject
88-
public IndicesFilterCache(Settings settings, NodeSettingsService nodeSettingsService) {
102+
public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) {
89103
super(settings);
104+
this.threadPool = threadPool;
90105
this.size = componentSettings.get("size", "20%");
91106
this.expire = componentSettings.getAsTime("expire", null);
107+
this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(1));
92108
computeSizeInBytes();
93109
buildCache();
94110
logger.debug("using [node] filter cache with size [{}], actual_size [{}]", size, new ByteSizeValue(sizeInBytes));
95111

96112
nodeSettingsService.addListener(new ApplySettings());
113+
114+
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, new ReaderCleaner());
97115
}
98116

99117
private void buildCache() {
@@ -102,7 +120,7 @@ private void buildCache() {
102120
.maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher());
103121

104122
// defaults to 4, but this is a busy map for all indices, increase it a bit
105-
cacheBuilder.concurrencyLevel(8);
123+
cacheBuilder.concurrencyLevel(16);
106124

107125
if (expire != null) {
108126
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
@@ -128,7 +146,12 @@ public synchronized void removeRemovalListener(String index) {
128146
removalListeners = MapBuilder.newMapBuilder(removalListeners).remove(index).immutableMap();
129147
}
130148

149+
public void addReaderKeyToClean(Object readerKey) {
150+
readersKeysToClean.add(readerKey);
151+
}
152+
131153
public void close() {
154+
closed = true;
132155
cache.invalidateAll();
133156
}
134157

@@ -147,4 +170,49 @@ public void onRemoval(RemovalNotification<WeightedFilterCache.FilterCacheKey, Do
147170
listener.onRemoval(removalNotification);
148171
}
149172
}
173+
174+
/**
175+
* The reason we need this class ie because we need to clean all the filters that are associated
176+
* with a reader. We don't want to do it every time a reader closes, since iterating over all the map
177+
* is expensive. There doesn't seem to be a nicer way to do it (and maintaining a list per reader
178+
* of the filters will cost more).
179+
*/
180+
class ReaderCleaner implements Runnable {
181+
182+
@Override
183+
public void run() {
184+
if (closed) {
185+
return;
186+
}
187+
if (readersKeysToClean.isEmpty()) {
188+
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
189+
return;
190+
}
191+
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
192+
@Override
193+
public void run() {
194+
THashSet<Object> keys = CacheRecycler.popHashSet();
195+
try {
196+
for (Iterator<Object> it = readersKeysToClean.iterator(); it.hasNext(); ) {
197+
keys.add(it.next());
198+
it.remove();
199+
}
200+
cache.cleanUp();
201+
if (!keys.isEmpty()) {
202+
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
203+
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
204+
if (keys.contains(filterCacheKey.readerKey())) {
205+
// same as invalidate
206+
it.remove();
207+
}
208+
}
209+
}
210+
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, ReaderCleaner.this);
211+
} finally {
212+
CacheRecycler.pushHashSet(keys);
213+
}
214+
}
215+
});
216+
}
217+
}
150218
}

0 commit comments

Comments
 (0)