Skip to content

Commit 76c8dd5

Browse files
committed
Merge pull request cockroachdb#6423 from bdarnell/reject-removed-replicas
storage: Reject all messages from removed replicas
2 parents 72188db + ec282a5 commit 76c8dd5

File tree

2 files changed

+99
-19
lines changed

2 files changed

+99
-19
lines changed

storage/client_raft_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,79 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
13911391
finishWG.Wait()
13921392
}
13931393

1394+
func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
1395+
defer leaktest.AfterTest(t)()
1396+
1397+
mtc := startMultiTestContext(t, 4)
1398+
defer mtc.Stop()
1399+
1400+
// Move the first range from the first node to the other three.
1401+
rangeID := roachpb.RangeID(1)
1402+
mtc.replicateRange(rangeID, 1, 2, 3)
1403+
mtc.unreplicateRange(rangeID, 0)
1404+
mtc.expireLeaderLeases()
1405+
1406+
// Write on the second node, to ensure that the other nodes have
1407+
// established leadership after the first node's removal.
1408+
incArgs := incrementArgs([]byte("a"), 5)
1409+
if _, err := client.SendWrapped(mtc.distSenders[1], nil, &incArgs); err != nil {
1410+
t.Fatal(err)
1411+
}
1412+
1413+
// Save the current term, which is the latest among the live stores.
1414+
findTerm := func() uint64 {
1415+
var term uint64
1416+
for i := 1; i < 4; i++ {
1417+
s := mtc.stores[i].RaftStatus(rangeID)
1418+
if s.Term > term {
1419+
term = s.Term
1420+
}
1421+
}
1422+
return term
1423+
}
1424+
term := findTerm()
1425+
if term == 0 {
1426+
t.Fatalf("expected non-zero term")
1427+
}
1428+
1429+
replica0 := roachpb.ReplicaDescriptor{
1430+
ReplicaID: roachpb.ReplicaID(mtc.stores[0].StoreID()),
1431+
NodeID: roachpb.NodeID(mtc.stores[0].StoreID()),
1432+
StoreID: mtc.stores[0].StoreID(),
1433+
}
1434+
replica1 := roachpb.ReplicaDescriptor{
1435+
ReplicaID: roachpb.ReplicaID(mtc.stores[1].StoreID()),
1436+
NodeID: roachpb.NodeID(mtc.stores[1].StoreID()),
1437+
StoreID: mtc.stores[1].StoreID(),
1438+
}
1439+
// Simulate an election triggered by the removed node.
1440+
if err := mtc.transports[0].Send(&storage.RaftMessageRequest{
1441+
GroupID: rangeID,
1442+
ToReplica: replica1,
1443+
FromReplica: replica0,
1444+
Message: raftpb.Message{
1445+
From: uint64(replica0.ReplicaID),
1446+
To: uint64(replica1.ReplicaID),
1447+
Type: raftpb.MsgVote,
1448+
Term: term + 1,
1449+
},
1450+
}); err != nil {
1451+
t.Fatal(err)
1452+
}
1453+
1454+
// Wait a bit for the message to be processed.
1455+
// TODO(bdarnell): This will be easier to test without waiting
1456+
// when #5789 is done.
1457+
time.Sleep(10 * time.Millisecond)
1458+
1459+
// The message should have been discarded without triggering an
1460+
// election or changing the term.
1461+
newTerm := findTerm()
1462+
if term != newTerm {
1463+
t.Errorf("expected term to be constant, but changed from %v to %v", term, newTerm)
1464+
}
1465+
}
1466+
13941467
func TestReplicateReAddAfterDown(t *testing.T) {
13951468
defer leaktest.AfterTest(t)()
13961469

storage/store.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1855,6 +1855,31 @@ func (s *Store) enqueueRaftMessage(req *RaftMessageRequest) error {
18551855
// Replica. It requires that processRaftMu is held and that s.mu is
18561856
// not held.
18571857
func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
1858+
// Drop messages that come from a node that we believe was once
1859+
// a member of the group but has been removed.
1860+
s.mu.Lock()
1861+
r, ok := s.mu.replicas[req.GroupID]
1862+
s.mu.Unlock()
1863+
if ok {
1864+
found := false
1865+
desc := r.Desc()
1866+
for _, rep := range desc.Replicas {
1867+
if rep.ReplicaID == req.FromReplica.ReplicaID {
1868+
found = true
1869+
break
1870+
}
1871+
}
1872+
// It's not a current member of the group. Is it from the future
1873+
// or the past?
1874+
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
1875+
if log.V(2) {
1876+
log.Infof("range %s: discarding message from replica %v, older than NextReplicaID %v",
1877+
req.GroupID, req.FromReplica, desc.NextReplicaID)
1878+
}
1879+
return nil
1880+
}
1881+
}
1882+
18581883
switch req.Message.Type {
18591884
case raftpb.MsgSnap:
18601885
if !s.canApplySnapshot(req.GroupID, req.Message.Snapshot) {
@@ -1864,26 +1889,8 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
18641889
return nil
18651890
}
18661891

1867-
// TODO(bdarnell): handle coalesced heartbeats
18681892
case raftpb.MsgHeartbeat:
1869-
// A subset of coalesced heartbeats: drop heartbeats (but not
1870-
// other messages!) that come from a node that we don't believe to
1871-
// be a current member of the group.
1872-
s.mu.Lock()
1873-
r, ok := s.mu.replicas[req.GroupID]
1874-
s.mu.Unlock()
1875-
if ok {
1876-
found := false
1877-
for _, rep := range r.Desc().Replicas {
1878-
if rep.ReplicaID == req.FromReplica.ReplicaID {
1879-
found = true
1880-
break
1881-
}
1882-
}
1883-
if !found && req.FromReplica.ReplicaID < r.Desc().NextReplicaID {
1884-
return nil
1885-
}
1886-
}
1893+
// TODO(bdarnell): handle coalesced heartbeats.
18871894
}
18881895

18891896
s.mu.Lock()

0 commit comments

Comments
 (0)