Skip to content

Commit 2259179

Browse files
authored
Added logic to detect new or deleted namespaces (oxia-db#298)
1 parent b749832 commit 2259179

File tree

5 files changed

+403
-49
lines changed

5 files changed

+403
-49
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2023 StreamNative, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package impl
16+
17+
import (
18+
"oxia/common"
19+
"oxia/coordinator/model"
20+
)
21+
22+
func getServers(servers []model.ServerAddress, startIdx uint32, count uint32) []model.ServerAddress {
23+
n := len(servers)
24+
res := make([]model.ServerAddress, count)
25+
for i := uint32(0); i < count; i++ {
26+
res[i] = servers[int(startIdx+i)%n]
27+
}
28+
return res
29+
}
30+
31+
func applyClusterChanges(config *model.ClusterConfig, currentStatus *model.ClusterStatus) (
32+
newStatus *model.ClusterStatus,
33+
shardsToAdd []int64,
34+
shardsToDelete []int64) {
35+
36+
shardsToAdd = make([]int64, 0)
37+
shardsToDelete = make([]int64, 0)
38+
39+
newStatus = &model.ClusterStatus{
40+
Namespaces: map[string]model.NamespaceStatus{},
41+
ShardIdGenerator: currentStatus.ShardIdGenerator,
42+
ServerIdx: currentStatus.ServerIdx,
43+
}
44+
45+
// Check for new namespaces
46+
for _, nc := range config.Namespaces {
47+
nss, existing := currentStatus.Namespaces[nc.Name]
48+
if !existing {
49+
// This is a new namespace
50+
nss = model.NamespaceStatus{
51+
Shards: map[int64]model.ShardMetadata{},
52+
ReplicationFactor: nc.ReplicationFactor,
53+
}
54+
for _, shard := range common.GenerateShards(newStatus.ShardIdGenerator, nc.InitialShardCount) {
55+
shardMetadata := model.ShardMetadata{
56+
Status: model.ShardStatusUnknown,
57+
Term: -1,
58+
Leader: nil,
59+
Ensemble: getServers(config.Servers, newStatus.ServerIdx, nc.ReplicationFactor),
60+
Int32HashRange: model.Int32HashRange{
61+
Min: shard.Min,
62+
Max: shard.Max,
63+
},
64+
}
65+
66+
nss.Shards[shard.Id] = shardMetadata
67+
newStatus.ServerIdx = (newStatus.ServerIdx + nc.ReplicationFactor) % uint32(len(config.Servers))
68+
shardsToAdd = append(shardsToAdd, shard.Id)
69+
}
70+
newStatus.Namespaces[nc.Name] = nss
71+
72+
newStatus.ShardIdGenerator += int64(nc.InitialShardCount)
73+
} else {
74+
// The namespace was already existing, nothing special to do
75+
newStatus.Namespaces[nc.Name] = nss.Clone()
76+
}
77+
}
78+
79+
// Check for any namespace that was removed
80+
for name, ns := range currentStatus.Namespaces {
81+
namespaceStillExists := false
82+
for _, cns := range config.Namespaces {
83+
if name == cns.Name {
84+
namespaceStillExists = true
85+
break
86+
}
87+
}
88+
89+
if !namespaceStillExists {
90+
// Keep the shards in the status and mark them as being deleted
91+
nss := ns.Clone()
92+
for shardId, shard := range nss.Shards {
93+
shard.Status = model.ShardStatusDeleting
94+
nss.Shards[shardId] = shard
95+
shardsToDelete = append(shardsToDelete, shardId)
96+
}
97+
98+
newStatus.Namespaces[name] = nss
99+
}
100+
}
101+
102+
return newStatus, shardsToAdd, shardsToDelete
103+
}
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Copyright 2023 StreamNative, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package impl
16+
17+
import (
18+
"github.com/stretchr/testify/assert"
19+
"math"
20+
"oxia/coordinator/model"
21+
"sort"
22+
"testing"
23+
)
24+
25+
var (
26+
s1 = model.ServerAddress{Public: "s1:6648", Internal: "s1:6649"}
27+
s2 = model.ServerAddress{Public: "s2:6648", Internal: "s2:6649"}
28+
s3 = model.ServerAddress{Public: "s3:6648", Internal: "s3:6649"}
29+
s4 = model.ServerAddress{Public: "s4:6648", Internal: "s4:6649"}
30+
)
31+
32+
func TestClientUpdates_ClusterInit(t *testing.T) {
33+
newStatus, shardsAdded, shardsToRemove := applyClusterChanges(&model.ClusterConfig{
34+
Namespaces: []model.NamespaceConfig{{
35+
Name: "ns-1",
36+
InitialShardCount: 1,
37+
ReplicationFactor: 3,
38+
}, {
39+
Name: "ns-2",
40+
InitialShardCount: 2,
41+
ReplicationFactor: 3,
42+
}},
43+
Servers: []model.ServerAddress{s1, s2, s3, s4},
44+
}, model.NewClusterStatus())
45+
46+
assert.Equal(t, &model.ClusterStatus{
47+
Namespaces: map[string]model.NamespaceStatus{
48+
"ns-1": {
49+
ReplicationFactor: 3,
50+
Shards: map[int64]model.ShardMetadata{
51+
0: {
52+
Status: model.ShardStatusUnknown,
53+
Term: -1,
54+
Leader: nil,
55+
Ensemble: []model.ServerAddress{s1, s2, s3},
56+
Int32HashRange: model.Int32HashRange{
57+
Min: 0,
58+
Max: math.MaxUint32,
59+
},
60+
},
61+
},
62+
},
63+
"ns-2": {
64+
ReplicationFactor: 3,
65+
Shards: map[int64]model.ShardMetadata{
66+
1: {
67+
Status: model.ShardStatusUnknown,
68+
Term: -1,
69+
Leader: nil,
70+
Ensemble: []model.ServerAddress{s4, s1, s2},
71+
Int32HashRange: model.Int32HashRange{
72+
Min: 0,
73+
Max: math.MaxUint32 / 2,
74+
},
75+
},
76+
2: {
77+
Status: model.ShardStatusUnknown,
78+
Term: -1,
79+
Leader: nil,
80+
Ensemble: []model.ServerAddress{s3, s4, s1},
81+
Int32HashRange: model.Int32HashRange{
82+
Min: math.MaxUint32/2 + 1,
83+
Max: math.MaxUint32,
84+
},
85+
},
86+
},
87+
},
88+
},
89+
ShardIdGenerator: 3,
90+
ServerIdx: 1,
91+
}, newStatus)
92+
93+
assert.Equal(t, []int64{}, shardsToRemove)
94+
assert.Equal(t, []int64{0, 1, 2}, shardsAdded)
95+
}
96+
97+
func TestClientUpdates_NamespaceAdded(t *testing.T) {
98+
newStatus, shardsAdded, shardsToRemove := applyClusterChanges(&model.ClusterConfig{
99+
Namespaces: []model.NamespaceConfig{{
100+
Name: "ns-1",
101+
InitialShardCount: 1,
102+
ReplicationFactor: 3,
103+
}, {
104+
Name: "ns-2",
105+
InitialShardCount: 2,
106+
ReplicationFactor: 3,
107+
}},
108+
Servers: []model.ServerAddress{s1, s2, s3, s4},
109+
}, &model.ClusterStatus{Namespaces: map[string]model.NamespaceStatus{
110+
"ns-1": {
111+
ReplicationFactor: 3,
112+
Shards: map[int64]model.ShardMetadata{
113+
0: {
114+
Status: model.ShardStatusUnknown,
115+
Term: -1,
116+
Leader: nil,
117+
Ensemble: []model.ServerAddress{s1, s2, s3},
118+
Int32HashRange: model.Int32HashRange{
119+
Min: 0,
120+
Max: math.MaxUint32,
121+
},
122+
},
123+
},
124+
},
125+
}, ShardIdGenerator: 1,
126+
ServerIdx: 3})
127+
128+
assert.Equal(t, &model.ClusterStatus{
129+
Namespaces: map[string]model.NamespaceStatus{
130+
"ns-1": {
131+
ReplicationFactor: 3,
132+
Shards: map[int64]model.ShardMetadata{
133+
0: {
134+
Status: model.ShardStatusUnknown,
135+
Term: -1,
136+
Leader: nil,
137+
Ensemble: []model.ServerAddress{s1, s2, s3},
138+
Int32HashRange: model.Int32HashRange{
139+
Min: 0,
140+
Max: math.MaxUint32,
141+
},
142+
},
143+
},
144+
},
145+
"ns-2": {
146+
ReplicationFactor: 3,
147+
Shards: map[int64]model.ShardMetadata{
148+
1: {
149+
Status: model.ShardStatusUnknown,
150+
Term: -1,
151+
Leader: nil,
152+
Ensemble: []model.ServerAddress{s4, s1, s2},
153+
Int32HashRange: model.Int32HashRange{
154+
Min: 0,
155+
Max: math.MaxUint32 / 2,
156+
},
157+
},
158+
2: {
159+
Status: model.ShardStatusUnknown,
160+
Term: -1,
161+
Leader: nil,
162+
Ensemble: []model.ServerAddress{s3, s4, s1},
163+
Int32HashRange: model.Int32HashRange{
164+
Min: math.MaxUint32/2 + 1,
165+
Max: math.MaxUint32,
166+
},
167+
},
168+
},
169+
},
170+
},
171+
ShardIdGenerator: 3,
172+
ServerIdx: 1,
173+
}, newStatus)
174+
175+
assert.Equal(t, []int64{}, shardsToRemove)
176+
assert.Equal(t, []int64{1, 2}, shardsAdded)
177+
}
178+
179+
func TestClientUpdates_NamespaceRemoved(t *testing.T) {
180+
newStatus, shardsAdded, shardsToRemove := applyClusterChanges(&model.ClusterConfig{
181+
Namespaces: []model.NamespaceConfig{{
182+
Name: "ns-1",
183+
InitialShardCount: 1,
184+
ReplicationFactor: 3,
185+
}},
186+
Servers: []model.ServerAddress{s1, s2, s3, s4},
187+
}, &model.ClusterStatus{
188+
Namespaces: map[string]model.NamespaceStatus{
189+
"ns-1": {
190+
ReplicationFactor: 3,
191+
Shards: map[int64]model.ShardMetadata{
192+
0: {
193+
Status: model.ShardStatusUnknown,
194+
Term: -1,
195+
Leader: nil,
196+
Ensemble: []model.ServerAddress{s1, s2, s3},
197+
Int32HashRange: model.Int32HashRange{
198+
Min: 0,
199+
Max: math.MaxUint32,
200+
},
201+
},
202+
},
203+
},
204+
"ns-2": {
205+
ReplicationFactor: 3,
206+
Shards: map[int64]model.ShardMetadata{
207+
1: {
208+
Status: model.ShardStatusUnknown,
209+
Term: -1,
210+
Leader: nil,
211+
Ensemble: []model.ServerAddress{s4, s1, s2},
212+
Int32HashRange: model.Int32HashRange{
213+
Min: 0,
214+
Max: math.MaxUint32 / 2,
215+
},
216+
},
217+
2: {
218+
Status: model.ShardStatusUnknown,
219+
Term: -1,
220+
Leader: nil,
221+
Ensemble: []model.ServerAddress{s3, s4, s1},
222+
Int32HashRange: model.Int32HashRange{
223+
Min: math.MaxUint32/2 + 1,
224+
Max: math.MaxUint32,
225+
},
226+
},
227+
},
228+
},
229+
},
230+
ShardIdGenerator: 3,
231+
ServerIdx: 1})
232+
233+
assert.Equal(t, &model.ClusterStatus{
234+
Namespaces: map[string]model.NamespaceStatus{
235+
"ns-1": {
236+
ReplicationFactor: 3,
237+
Shards: map[int64]model.ShardMetadata{
238+
0: {
239+
Status: model.ShardStatusUnknown,
240+
Term: -1,
241+
Leader: nil,
242+
Ensemble: []model.ServerAddress{s1, s2, s3},
243+
Int32HashRange: model.Int32HashRange{
244+
Min: 0,
245+
Max: math.MaxUint32,
246+
},
247+
},
248+
},
249+
},
250+
"ns-2": {
251+
ReplicationFactor: 3,
252+
Shards: map[int64]model.ShardMetadata{
253+
1: {
254+
Status: model.ShardStatusDeleting,
255+
Term: -1,
256+
Leader: nil,
257+
Ensemble: []model.ServerAddress{s4, s1, s2},
258+
Int32HashRange: model.Int32HashRange{
259+
Min: 0,
260+
Max: math.MaxUint32 / 2,
261+
},
262+
},
263+
2: {
264+
Status: model.ShardStatusDeleting,
265+
Term: -1,
266+
Leader: nil,
267+
Ensemble: []model.ServerAddress{s3, s4, s1},
268+
Int32HashRange: model.Int32HashRange{
269+
Min: math.MaxUint32/2 + 1,
270+
Max: math.MaxUint32,
271+
},
272+
},
273+
},
274+
},
275+
},
276+
ShardIdGenerator: 3,
277+
ServerIdx: 1,
278+
}, newStatus)
279+
280+
sort.Slice(shardsToRemove, func(i, j int) bool { return shardsToRemove[i] < shardsToRemove[j] })
281+
assert.Equal(t, []int64{1, 2}, shardsToRemove)
282+
assert.Equal(t, []int64{}, shardsAdded)
283+
}

0 commit comments

Comments
 (0)