Skip to content

Commit dcdd34b

Browse files
authored
Include head & commit offset in GetStatus (oxia-db#326)
1 parent b7e2f01 commit dcdd34b

File tree

6 files changed

+184
-70
lines changed

6 files changed

+184
-70
lines changed

proto/replication.pb.go

Lines changed: 84 additions & 64 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/replication.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,6 @@ message GetStatusResponse {
145145
int64 term = 1;
146146
ServingStatus status = 2;
147147

148-
// More info to add for leaders
149-
// ensemble, lag for each follower, etc...
148+
int64 head_offset = 3;
149+
int64 commit_offset = 4;
150150
}

server/follower_controller.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,8 +662,10 @@ func (fc *followerController) GetStatus(request *proto.GetStatusRequest) (*proto
662662
defer fc.Unlock()
663663

664664
return &proto.GetStatusResponse{
665-
Term: fc.term,
666-
Status: fc.status,
665+
Term: fc.term,
666+
Status: fc.status,
667+
HeadOffset: fc.wal.LastOffset(),
668+
CommitOffset: fc.CommitOffset(),
667669
}, nil
668670
}
669671

server/follower_controller_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,53 @@ func TestFollowerController_Closed(t *testing.T) {
820820
assert.NoError(t, walFactory.Close())
821821
}
822822

823+
func TestFollower_GetStatus(t *testing.T) {
824+
var shardId int64
825+
kvFactory, _ := kv.NewPebbleKVFactory(testKVOptions)
826+
walFactory := wal.NewInMemoryWalFactory()
827+
828+
fc, _ := NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory)
829+
_, _ = fc.NewTerm(&proto.NewTermRequest{Term: 2})
830+
831+
stream := newMockServerReplicateStream()
832+
go func() {
833+
//cancelled due to fc.Close() below
834+
assert.ErrorIs(t, fc.Replicate(stream), context.Canceled)
835+
}()
836+
837+
stream.AddRequest(createAddRequest(t, 2, 0, map[string]string{"a": "0", "b": "1"}, wal.InvalidOffset))
838+
stream.AddRequest(createAddRequest(t, 2, 1, map[string]string{"a": "0", "b": "1"}, 0))
839+
stream.AddRequest(createAddRequest(t, 2, 2, map[string]string{"a": "0", "b": "1"}, 1))
840+
841+
// Wait for responses
842+
r1 := stream.GetResponse()
843+
assert.EqualValues(t, 0, r1.Offset)
844+
845+
r2 := stream.GetResponse()
846+
assert.EqualValues(t, 1, r2.Offset)
847+
848+
r3 := stream.GetResponse()
849+
assert.EqualValues(t, 2, r3.Offset)
850+
851+
assert.Eventually(t, func() bool {
852+
res, _ := fc.GetStatus(&proto.GetStatusRequest{ShardId: shardId})
853+
return res.CommitOffset == 1
854+
}, 10*time.Second, 100*time.Millisecond)
855+
856+
res, err := fc.GetStatus(&proto.GetStatusRequest{ShardId: shardId})
857+
assert.NoError(t, err)
858+
assert.Equal(t, &proto.GetStatusResponse{
859+
Term: 2,
860+
Status: proto.ServingStatus_FOLLOWER,
861+
HeadOffset: 2,
862+
CommitOffset: 1,
863+
}, res)
864+
865+
assert.NoError(t, fc.Close())
866+
assert.NoError(t, kvFactory.Close())
867+
assert.NoError(t, walFactory.Close())
868+
}
869+
823870
func closeChanIsNotNil(fc FollowerController) func() bool {
824871
return func() bool {
825872
_fc := fc.(*followerController)

server/leader_controller.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -859,8 +859,10 @@ func (lc *leaderController) GetStatus(request *proto.GetStatusRequest) (*proto.G
859859
defer lc.RUnlock()
860860

861861
return &proto.GetStatusResponse{
862-
Term: lc.term,
863-
Status: lc.status,
862+
Term: lc.term,
863+
Status: lc.status,
864+
HeadOffset: lc.quorumAckTracker.HeadOffset(),
865+
CommitOffset: lc.quorumAckTracker.CommitOffset(),
864866
}, nil
865867
}
866868

server/leader_controller_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,3 +996,46 @@ func TestLeaderController_DeleteShard(t *testing.T) {
996996
assert.NoError(t, lc.Close())
997997
assert.NoError(t, walFactory.Close())
998998
}
999+
1000+
func TestLeaderController_GetStatus(t *testing.T) {
1001+
var shard int64 = 1
1002+
1003+
kvFactory, _ := kv.NewPebbleKVFactory(testKVOptions)
1004+
walFactory := wal.NewInMemoryWalFactory()
1005+
1006+
lc, _ := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory)
1007+
_, _ = lc.NewTerm(&proto.NewTermRequest{ShardId: shard, Term: 2})
1008+
_, _ = lc.BecomeLeader(&proto.BecomeLeaderRequest{
1009+
ShardId: shard,
1010+
Term: 2,
1011+
ReplicationFactor: 1,
1012+
FollowerMaps: nil,
1013+
})
1014+
1015+
/// Write entry
1016+
_, _ = lc.Write(&proto.WriteRequest{
1017+
ShardId: &shard,
1018+
Puts: []*proto.PutRequest{{
1019+
Key: "a",
1020+
Value: []byte("value-a")}},
1021+
})
1022+
_, _ = lc.Write(&proto.WriteRequest{
1023+
ShardId: &shard,
1024+
Puts: []*proto.PutRequest{{
1025+
Key: "b",
1026+
Value: []byte("value-b")}},
1027+
})
1028+
1029+
res, err := lc.GetStatus(&proto.GetStatusRequest{ShardId: shard})
1030+
assert.NoError(t, err)
1031+
assert.Equal(t, &proto.GetStatusResponse{
1032+
Term: 2,
1033+
Status: proto.ServingStatus_LEADER,
1034+
HeadOffset: 1,
1035+
CommitOffset: 1,
1036+
}, res)
1037+
1038+
assert.NoError(t, lc.Close())
1039+
assert.NoError(t, kvFactory.Close())
1040+
assert.NoError(t, walFactory.Close())
1041+
}

0 commit comments

Comments
 (0)