Skip to content

Commit 6c998c3

Browse files
author
Manish Khandelwal
committed
CASSANDRA-20291 Filter unreachable private IPs from batch log writing
Add filtering logic to remove reachable private IPs from list of nodes to be selected for batch log writes. Patch By Manish Khandelwal, Reviewed by TBD for CASSANDRA-20291
1 parent 93e7718 commit 6c998c3

File tree

1 file changed

+109
-4
lines changed

1 file changed

+109
-4
lines changed

src/java/org/apache/cassandra/locator/ReplicaPlans.java

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818

1919
package org.apache.cassandra.locator;
2020

21+
import java.io.IOException;
22+
import java.net.InetSocketAddress;
23+
import java.net.Socket;
24+
import java.net.UnknownHostException;
2125
import java.util.ArrayList;
2226
import java.util.Collection;
2327
import java.util.Collections;
2428
import java.util.HashSet;
2529
import java.util.LinkedHashSet;
2630
import java.util.List;
2731
import java.util.Map;
32+
import java.util.Optional;
2833
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
2935
import java.util.concurrent.ThreadLocalRandom;
3036
import java.util.function.Consumer;
3137
import java.util.function.Function;
@@ -55,7 +61,11 @@
5561
import org.apache.cassandra.dht.AbstractBounds;
5662
import org.apache.cassandra.dht.Token;
5763
import org.apache.cassandra.exceptions.UnavailableException;
64+
import org.apache.cassandra.gms.ApplicationState;
65+
import org.apache.cassandra.gms.EndpointState;
5866
import org.apache.cassandra.gms.FailureDetector;
67+
import org.apache.cassandra.gms.Gossiper;
68+
import org.apache.cassandra.gms.VersionedValue;
5969
import org.apache.cassandra.schema.SchemaConstants;
6070
import org.apache.cassandra.service.StorageService;
6171
import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
@@ -80,6 +90,9 @@ public class ReplicaPlans
8090
private static final int REQUIRED_BATCHLOG_REPLICA_COUNT
8191
= Math.max(1, Math.min(2, CassandraRelevantProperties.REQUIRED_BATCHLOG_REPLICA_COUNT.getInt()));
8292

93+
private static final Map<InetAddressAndPort, CacheEntry> reachabilityCache = new ConcurrentHashMap<>();
94+
private static final int CACHE_TTL_MS = 30000; // 30 seconds
95+
8396
static
8497
{
8598
int batchlogReplicaCount = CassandraRelevantProperties.REQUIRED_BATCHLOG_REPLICA_COUNT.getInt();
@@ -295,8 +308,27 @@ private static ListMultimap<String, InetAddressAndPort> validate(boolean preferL
295308
for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
296309
{
297310
InetAddressAndPort addr = entry.getValue();
298-
if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr))
299-
validated.put(entry.getKey(), entry.getValue());
311+
312+
// Skip local address and nodes marked down by FailureDetector
313+
if (addr.equals(FBUtilities.getBroadcastAddressAndPort()))
314+
continue;
315+
316+
if (!isAlive.test(addr))
317+
continue;
318+
319+
// Check for intra dc connectivity on private network. Cache the result for 30 seconds.
320+
// Valid for topology where two newtwork interfaces are used.
321+
Optional<InetAddressAndPort> maybeInternal = getInternalAddressAndPort(addr);
322+
if (maybeInternal.isPresent()) {
323+
InetAddressAndPort internal = maybeInternal.get();
324+
if (isReachableWithCache(internal, CACHE_TTL_MS)) {
325+
validated.put(entry.getKey(), addr);
326+
}
327+
} else {
328+
// No internal address means either it's a single-interface node or gossip isn't set up;
329+
// trust isAlive (failure detector) in this case.
330+
validated.put(entry.getKey(), addr);
331+
}
300332
}
301333

302334
// return early if no more than 2 nodes:
@@ -413,14 +445,87 @@ public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(bool
413445
continue;
414446
if (result.contains(endpoint))
415447
continue;
416-
417448
result.add(endpoint);
418449
}
419450
}
420-
421451
return result;
422452
}
423453

454+
private static Optional<InetAddressAndPort> getInternalAddressAndPort(InetAddressAndPort endpoint)
455+
{
456+
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
457+
if (state == null)
458+
{
459+
logger.debug("No EndpointState found for endpoint: {}", endpoint);
460+
return Optional.empty();
461+
}
462+
463+
VersionedValue internal = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
464+
if (internal == null || internal.value == null || internal.value.trim().isEmpty())
465+
{
466+
logger.debug("No INTERNAL_ADDRESS_AND_PORT state for endpoint: {}", endpoint);
467+
return Optional.empty();
468+
}
469+
470+
try
471+
{
472+
InetAddressAndPort internalIp = InetAddressAndPort.getByName(internal.value.trim());
473+
return Optional.of(InetAddressAndPort.getByAddressOverrideDefaults(internalIp.getAddress(), internalIp.getPort()));
474+
}
475+
catch (UnknownHostException e)
476+
{
477+
logger.warn("Failed to parse INTERNAL_ADDRESS_AND_PORT [{}] for endpoint: {} due to {}", internal.value, endpoint, e.getMessage());
478+
return Optional.empty();
479+
}
480+
}
481+
482+
@VisibleForTesting
483+
public static class CacheEntry
484+
{
485+
boolean reachable;
486+
long timestampMillis;
487+
488+
public CacheEntry(boolean reachable, long timestampMillis)
489+
{
490+
this.reachable = reachable;
491+
this.timestampMillis = timestampMillis;
492+
}
493+
}
494+
495+
// Check reachability with cache
496+
private static boolean isReachableWithCache(InetAddressAndPort address, int timeoutMs)
497+
{
498+
if (address == null)
499+
{
500+
logger.debug("Null address provided to isReachableWithCache, treating as unreachable.");
501+
return false;
502+
}
503+
504+
long now = FBUtilities.now().toEpochMilli();
505+
CacheEntry entry = reachabilityCache.get(address);
506+
507+
if (entry != null && (now - entry.timestampMillis) < CACHE_TTL_MS) {
508+
logger.trace("Using cached reachability for {}: {}", address, entry.reachable);
509+
return entry.reachable;
510+
}
511+
512+
boolean reachable = isReachableOnce(address, timeoutMs);
513+
reachabilityCache.put(address, new CacheEntry(reachable, now));
514+
logger.debug("Refreshed reachability for {}: {}", address, reachable);
515+
return reachable;
516+
}
517+
518+
private static boolean isReachableOnce(InetAddressAndPort address, int timeoutMs)
519+
{
520+
try (Socket socket = new Socket()) {
521+
socket.connect(new InetSocketAddress(address.getAddress(), address.getPort()), timeoutMs);
522+
return true;
523+
} catch (IOException e) {
524+
logger.trace("Unreachable: {} due to {}", address, e.toString());
525+
return false;
526+
}
527+
}
528+
424529
@VisibleForTesting
425530
public static List<InetAddressAndPort> sortByProximity(Collection<InetAddressAndPort> endpoints)
426531
{

0 commit comments

Comments
 (0)