Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/audit/AuditLogEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ String getLogString()
StringBuilder builder = new StringBuilder(100);
builder.append("user:").append(user)
.append("|host:").append(host)
.append("|source:").append(source.address);
if (source.port > 0)
.append("|source:").append(source.getAddress());
if (source.getPort() > 0)
{
builder.append("|port:").append(source.port);
builder.append("|port:").append(source.getPort());
}

builder.append("|timestamp:").append(timestamp)
Expand Down
37 changes: 19 additions & 18 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
Expand All @@ -36,7 +37,7 @@
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;

import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -716,9 +717,9 @@ public static synchronized void updateTokens(InetAddressAndPort ep, Collection<T
return;

String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens));
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), tokensAsSet(tokens));
req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens));
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), tokensAsSet(tokens));
}

public static synchronized boolean updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip)
Expand All @@ -727,9 +728,9 @@ public static synchronized boolean updatePreferredIP(InetAddressAndPort ep, Inet
return false;

String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), preferred_ip.getAddress());
req = "INSERT INTO system.%s (peer, peer_port, preferred_ip, preferred_port) VALUES (?, ?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, preferred_ip.address, preferred_ip.port);
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), preferred_ip.getAddress(), preferred_ip.getPort());
forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
return true;
}
Expand All @@ -740,14 +741,14 @@ public static synchronized void updatePeerInfo(InetAddressAndPort ep, String col
return;

String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS, columnName), ep.address, value);
executeInternal(String.format(req, LEGACY_PEERS, columnName), ep.getAddress(), value);
//This column doesn't match across the two tables
if (columnName.equals("rpc_address"))
{
columnName = "native_address";
}
req = "INSERT INTO system.%s (peer, peer_port, %s) VALUES (?, ?, ?)";
executeInternal(String.format(req, PEERS_V2, columnName), ep.address, ep.port, value);
executeInternal(String.format(req, PEERS_V2, columnName), ep.getAddress(), ep.getPort(), value);
}

public static synchronized void updatePeerNativeAddress(InetAddressAndPort ep, InetAddressAndPort address)
Expand All @@ -756,19 +757,19 @@ public static synchronized void updatePeerNativeAddress(InetAddressAndPort ep, I
return;

String req = "INSERT INTO system.%s (peer, rpc_address) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, address.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), address.getAddress());
req = "INSERT INTO system.%s (peer, peer_port, native_address, native_port) VALUES (?, ?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, address.address, address.port);
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), address.getAddress(), address.getPort());
}


public static synchronized void updateHintsDropped(InetAddressAndPort ep, UUID timePeriod, int value)
{
// with 30 day TTL
String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, value, ep.address);
executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, value, ep.getAddress());
req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ? AND peer_port = ?";
executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, ep.address, ep.port);
executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, ep.getAddress(), ep.getPort());
}

public static synchronized void updateSchemaVersion(UUID version)
Expand Down Expand Up @@ -800,12 +801,12 @@ private static Collection<Token> deserializeTokens(Collection<String> tokensStri
/**
* Remove stored tokens being used by another node
*/
public static synchronized void removeEndpoint(InetAddressAndPort ep)
public static synchronized void removeEndpoint(InetSocketAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
executeInternal(String.format(req, LEGACY_PEERS), ep.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress());
req = String.format("DELETE FROM system.%s WHERE peer = ? AND peer_port = ?", PEERS_V2);
executeInternal(req, ep.address, ep.port);
executeInternal(req, ep.getAddress(), ep.getPort());
forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
}

Expand Down Expand Up @@ -887,7 +888,7 @@ public static Map<InetAddressAndPort, UUID> loadHostIds()
public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
{
String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE peer=? AND peer_port = ?";
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
if (!result.isEmpty() && result.one().has("preferred_ip"))
{
UntypedResultSet.Row row = result.one();
Expand Down Expand Up @@ -934,7 +935,7 @@ public static CassandraVersion getReleaseVersion(InetAddressAndPort ep)
return CURRENT_VERSION;
}
String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?";
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
if (result != null && result.one().has("release_version"))
{
return new CassandraVersion(result.one().getString("release_version"));
Expand Down Expand Up @@ -1428,9 +1429,9 @@ public static synchronized void updateTransferredRanges(StreamOperation streamOp
{
rangesToUpdate.add(rangeToBytes(range));
}
executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer.address, keyspace);
executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer.getAddress(), keyspace);
cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation = ? AND peer = ? AND peer_port = ? AND keyspace_name = ?";
executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), rangesToUpdate, streamOperation.getDescription(), peer.address, peer.port, keyspace);
executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), rangesToUpdate, streamOperation.getDescription(), peer.getAddress(), peer.getPort(), keyspace);
}

public static synchronized Map<InetAddressAndPort, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void addRow(SimpleDataSet dataSet, InetAddressAndPort addressAndPort, In
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(addressAndPort);
String rack = DatabaseDescriptor.getEndpointSnitch().getRack(addressAndPort);
dataSet.row(addressAndPort.address, addressAndPort.port, dc, rack)
dataSet.row(addressAndPort.getAddress(), addressAndPort.getPort(), dc, rack)
.column(USING_BYTES, handlers.usingCapacity())
.column(USING_RESERVE_BYTES, handlers.usingEndpointReserveCapacity())
.column(CORRUPT_FRAMES_RECOVERED, handlers.corruptFramesRecovered())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void addRow(SimpleDataSet dataSet, InetAddressAndPort addressAndPort, Ou
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(addressAndPort);
String rack = DatabaseDescriptor.getEndpointSnitch().getRack(addressAndPort);
long pendingBytes = sum(connections, OutboundConnection::pendingBytes);
dataSet.row(addressAndPort.address, addressAndPort.port, dc, rack)
dataSet.row(addressAndPort.getAddress(), addressAndPort.getPort(), dc, rack)
.column(USING_BYTES, pendingBytes)
.column(USING_RESERVE_BYTES, connections.usingReserveBytes())
.column(PENDING_COUNT, sum(connections, OutboundConnection::pendingCount))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public FakeNode(InetAddressAndPort address, Integer rackId, Collection<Token> to

public int nodeId()
{
return fakeAddressAndPort.port;
return fakeAddressAndPort.getPort();
}

public int rackId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private void reset()

public Map<InetAddress, Double> getScores()
{
return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().address, Map.Entry::getValue));
return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().getAddress(), Map.Entry::getValue));
}

public Map<String, Double> getScoresWithPort()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void gossiperStarting()
throw new RuntimeException(e);
}
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(address));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(address.address));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(address.getAddress()));
Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
}
}
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/locator/IEndpointSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.locator;

import java.net.InetSocketAddress;
import java.util.Set;

import org.apache.cassandra.utils.FBUtilities;
Expand Down Expand Up @@ -55,6 +56,11 @@ default public String getLocalDatacenter()
return getDatacenter(FBUtilities.getBroadcastAddressAndPort());
}

default String getDatacenter(InetSocketAddress endpoint)
{
return getDatacenter(InetAddressAndPort.getByAddress(endpoint));
}

default public String getDatacenter(Replica replica)
{
return getDatacenter(replica.endpoint());
Expand Down
Loading