Skip to content

ES-11372: Add project ID to key for geoip cache #129572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -105,8 +107,9 @@ int current() {

@Override
@Nullable
@FixForMultiProject // do not use ProjectId.DEFAULT
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
return cache.putIfAbsent(ipAddress, cachedDatabasePathToString, ip -> {
return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> {
try {
return responseProvider.apply(get(), ipAddress);
} catch (Exception e) {
Expand Down Expand Up @@ -143,9 +146,10 @@ public void shutdown() throws IOException {
}

// Visible for Testing
@FixForMultiProject // do not use ProjectId.DEFAULT
protected void doShutdown() throws IOException {
IOUtils.close(databaseReader.get());
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath);
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath);
logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
if (deleteDatabaseFileOnShutdown) {
logger.info("deleting [{}]", databasePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.maxmind.db.NodeCache;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -60,9 +61,9 @@ public String toString() {
}

@SuppressWarnings("unchecked")
<RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
<RESPONSE> RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(ip, databasePath);
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
long cacheStart = relativeNanoTimeProvider.getAsLong();
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
Object response = cache.get(cacheKey);
Expand Down Expand Up @@ -92,16 +93,16 @@ <RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String,
}

// only useful for testing
Object get(String ip, String databasePath) {
CacheKey cacheKey = new CacheKey(ip, databasePath);
Object get(ProjectId projectId, String ip, String databasePath) {
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
return cache.get(cacheKey);
}

public int purgeCacheEntriesForDatabase(Path databaseFile) {
public int purgeCacheEntriesForDatabase(ProjectId projectId, Path databaseFile) {
String databasePath = databaseFile.toString();
int counter = 0;
for (CacheKey key : cache.keys()) {
if (key.databasePath.equals(databasePath)) {
if (key.projectId.equals(projectId) && key.databasePath.equals(databasePath)) {
cache.invalidate(key);
counter++;
}
Expand Down Expand Up @@ -135,5 +136,5 @@ public CacheStats getCacheStats() {
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both.
*/
private record CacheKey(String ip, String databasePath) {}
private record CacheKey(ProjectId projectId, String ip, String databasePath) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import com.maxmind.geoip2.model.AbstractResponse;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.geoip.stats.CacheStats;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -26,21 +28,22 @@ public class GeoIpCacheTests extends ESTestCase {

public void testCachesAndEvictsResults() {
GeoIpCache cache = new GeoIpCache(1);
ProjectId projectId = randomProjectIdOrDefault();
AbstractResponse response1 = mock(AbstractResponse.class);
AbstractResponse response2 = mock(AbstractResponse.class);

// add a key
AbstractResponse cachedResponse = cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1);
AbstractResponse cachedResponse = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1);
assertSame(cachedResponse, response1);
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1));
assertSame(cachedResponse, cache.get("127.0.0.1", "path/to/db"));
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1));
assertSame(cachedResponse, cache.get(projectId, "127.0.0.1", "path/to/db"));

// evict old key by adding another value
cachedResponse = cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2);
cachedResponse = cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2);
assertSame(cachedResponse, response2);
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2));
assertSame(cachedResponse, cache.get("127.0.0.2", "path/to/db"));
assertNotSame(response1, cache.get("127.0.0.1", "path/to/db"));
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2));
assertSame(cachedResponse, cache.get(projectId, "127.0.0.2", "path/to/db"));
assertNotSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db"));
}

public void testCachesNoResult() {
Expand All @@ -51,31 +54,47 @@ public void testCachesNoResult() {
return null;
};

AbstractResponse response = cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull);
ProjectId projectId = randomProjectIdOrDefault();
AbstractResponse response = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull);
assertNull(response);
assertNull(cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull));
assertNull(cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull));
assertEquals(1, count.get());

// the cached value is not actually *null*, it's the NO_RESULT sentinel
assertSame(GeoIpCache.NO_RESULT, cache.get("127.0.0.1", "path/to/db"));
assertSame(GeoIpCache.NO_RESULT, cache.get(projectId, "127.0.0.1", "path/to/db"));
}

public void testCacheKey() {
public void testCacheDoesNotCollideForDifferentDatabases() {
GeoIpCache cache = new GeoIpCache(2);
AbstractResponse response1 = mock(AbstractResponse.class);
AbstractResponse response2 = mock(AbstractResponse.class);

assertSame(response1, cache.putIfAbsent("127.0.0.1", "path/to/db1", ip -> response1));
assertSame(response2, cache.putIfAbsent("127.0.0.1", "path/to/db2", ip -> response2));
assertSame(response1, cache.get("127.0.0.1", "path/to/db1"));
assertSame(response2, cache.get("127.0.0.1", "path/to/db2"));
ProjectId projectId = randomProjectIdOrDefault();
assertSame(response1, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db1", ip -> response1));
assertSame(response2, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db2", ip -> response2));
assertSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db1"));
assertSame(response2, cache.get(projectId, "127.0.0.1", "path/to/db2"));
}

public void testCacheDoesNotCollideForDifferentProjects() {
GeoIpCache cache = new GeoIpCache(2);
AbstractResponse response1 = mock(AbstractResponse.class);
AbstractResponse response2 = mock(AbstractResponse.class);

ProjectId projectId1 = randomUniqueProjectId();
ProjectId projectId2 = randomUniqueProjectId();
assertSame(response1, cache.putIfAbsent(projectId1, "127.0.0.1", "path/to/db1", ip -> response1));
assertSame(response2, cache.putIfAbsent(projectId2, "127.0.0.1", "path/to/db1", ip -> response2));
assertSame(response1, cache.get(projectId1, "127.0.0.1", "path/to/db1"));
assertSame(response2, cache.get(projectId2, "127.0.0.1", "path/to/db1"));
}

public void testThrowsFunctionsException() {
GeoIpCache cache = new GeoIpCache(1);
ProjectId projectId = randomProjectIdOrDefault();
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> {
() -> cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> {
throw new IllegalArgumentException("bad");
})
);
Expand All @@ -92,19 +111,20 @@ public void testGetCacheStats() {
final AtomicLong testNanoTime = new AtomicLong(0);
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
GeoIpCache cache = new GeoIpCache(maxCacheSize, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
ProjectId projectId = randomProjectIdOrDefault();
AbstractResponse response = mock(AbstractResponse.class);
String databasePath = "path/to/db1";
String key1 = "127.0.0.1";
String key2 = "127.0.0.2";
String key3 = "127.0.0.3";

cache.putIfAbsent(key1, databasePath, ip -> response); // cache miss
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(key3, databasePath, ip -> response); // cache miss, key2 will be evicted
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss, key1 will be evicted
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache miss
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
cache.putIfAbsent(projectId, key3, databasePath, ip -> response); // cache miss, key2 will be evicted
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss, key1 will be evicted
CacheStats cacheStats = cache.getCacheStats();
assertThat(cacheStats.count(), equalTo(maxCacheSize));
assertThat(cacheStats.hits(), equalTo(3L));
Expand All @@ -115,4 +135,28 @@ public void testGetCacheStats() {
// There are 4 misses. Each is made up of a cache query, and a database query, each being 1ms:
assertThat(cacheStats.missesTimeInMillis(), equalTo(8L));
}

public void testPurgeCacheEntriesForDatabase() {
GeoIpCache cache = new GeoIpCache(100);
ProjectId projectId1 = randomUniqueProjectId();
ProjectId projectId2 = randomUniqueProjectId();
String databasePath1 = "path/to/db1";
String databasePath2 = "path/to/db2";
String ip1 = "127.0.0.1";
String ip2 = "127.0.0.2";

AbstractResponse response = mock(AbstractResponse.class);
cache.putIfAbsent(projectId1, ip1, databasePath1, ip -> response); // cache miss
cache.putIfAbsent(projectId1, ip2, databasePath1, ip -> response); // cache miss
cache.putIfAbsent(projectId2, ip1, databasePath1, ip -> response); // cache miss
cache.putIfAbsent(projectId1, ip1, databasePath2, ip -> response); // cache miss
cache.purgeCacheEntriesForDatabase(projectId1, PathUtils.get(databasePath1));
// should have purged entries for projectId1 and databasePath1...
assertNull(cache.get(projectId1, ip1, databasePath1));
assertNull(cache.get(projectId1, ip2, databasePath1));
// ...but left the one for projectId2...
assertSame(response, cache.get(projectId2, ip1, databasePath1));
// ...and for databasePath2:
assertSame(response, cache.get(projectId1, ip1, databasePath2));
}
}