Skip to content

Commit 7baa0cd

Browse files
authored
Dynamically add/remove namespaces when the cluster config is updated (oxia-db#313)
1 parent 54184a4 commit 7baa0cd

File tree

10 files changed

+367
-94
lines changed

10 files changed

+367
-94
lines changed

cmd/coordinator/cmd.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"oxia/cmd/flag"
2323
"oxia/common"
2424
"oxia/coordinator"
25+
"oxia/coordinator/model"
26+
"time"
2527
)
2628

2729
var (
@@ -45,6 +47,7 @@ func init() {
4547
Cmd.Flags().StringVar(&conf.K8SMetadataConfigMapName, "k8s-configmap-name", conf.K8SMetadataConfigMapName, "ConfigMap name for metadata configmap")
4648
Cmd.Flags().StringVar(&conf.FileMetadataPath, "file-clusters-status-path", "data/cluster-status.json", "The path where the cluster status is stored when using 'file' provider")
4749
Cmd.Flags().StringVarP(&configFile, "conf", "f", "", "Cluster config file")
50+
Cmd.Flags().DurationVar(&conf.ClusterConfigRefreshTime, "conf-file-refresh-time", 1*time.Minute, "How frequently to check for updates for cluster configuration file")
4851
}
4952

5053
func validate(*cobra.Command, []string) error {
@@ -56,28 +59,36 @@ func validate(*cobra.Command, []string) error {
5659
return errors.New("k8s-configmap-name must be set with metadata=configmap")
5760
}
5861
}
59-
if err := loadClusterConfig(); err != nil {
62+
if _, err := loadClusterConfig(); err != nil {
6063
return err
6164
}
6265
return nil
6366
}
6467

65-
func loadClusterConfig() error {
68+
func loadClusterConfig() (model.ClusterConfig, error) {
6669
if configFile == "" {
6770
viper.AddConfigPath("/oxia/conf")
6871
viper.AddConfigPath(".")
6972
} else {
7073
viper.SetConfigFile(configFile)
7174
}
7275

76+
cc := model.ClusterConfig{}
77+
7378
if err := viper.ReadInConfig(); err != nil {
74-
return err
79+
return cc, err
7580
}
7681

77-
return viper.Unmarshal(&conf.ClusterConfig)
82+
if err := viper.Unmarshal(&cc); err != nil {
83+
return cc, err
84+
}
85+
86+
return cc, nil
7887
}
7988

8089
func exec(*cobra.Command, []string) {
90+
conf.ClusterConfigProvider = loadClusterConfig
91+
8192
common.RunProcess(func() (io.Closer, error) {
8293
return coordinator.New(conf)
8394
})

cmd/coordinator/cmd_test.go

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -54,84 +54,96 @@ func TestCmd(t *testing.T) {
5454
}()
5555

5656
for _, test := range []struct {
57-
args []string
58-
expectedConf coordinator.Config
59-
isErr bool
57+
args []string
58+
expectedConf coordinator.Config
59+
expectedClusterConf model.ClusterConfig
60+
isErr bool
6061
}{
6162
{[]string{}, coordinator.Config{
62-
InternalServiceAddr: "localhost:6649",
63-
MetricsServiceAddr: "localhost:8080",
64-
MetadataProviderImpl: coordinator.File,
65-
ClusterConfig: model.ClusterConfig{
66-
Namespaces: []model.NamespaceConfig{{
67-
Name: common.DefaultNamespace,
68-
ReplicationFactor: 1,
69-
InitialShardCount: 2,
70-
}},
71-
Servers: []model.ServerAddress{{
72-
Public: "public:1234",
73-
Internal: "internal:5678",
74-
}}}}, false},
63+
InternalServiceAddr: "localhost:6649",
64+
MetricsServiceAddr: "localhost:8080",
65+
MetadataProviderImpl: coordinator.File,
66+
ClusterConfigRefreshTime: 0,
67+
}, model.ClusterConfig{
68+
Namespaces: []model.NamespaceConfig{{
69+
Name: common.DefaultNamespace,
70+
ReplicationFactor: 1,
71+
InitialShardCount: 2,
72+
}},
73+
Servers: []model.ServerAddress{{
74+
Public: "public:1234",
75+
Internal: "internal:5678",
76+
},
77+
},
78+
}, false},
7579
{[]string{"-i=localhost:1234"}, coordinator.Config{
7680
InternalServiceAddr: "localhost:1234",
7781
MetricsServiceAddr: "localhost:8080",
7882
MetadataProviderImpl: coordinator.File,
79-
ClusterConfig: model.ClusterConfig{
80-
Namespaces: []model.NamespaceConfig{{
81-
Name: common.DefaultNamespace,
82-
ReplicationFactor: 1,
83-
InitialShardCount: 2,
84-
}},
85-
Servers: []model.ServerAddress{{
86-
Public: "public:1234",
87-
Internal: "internal:5678",
88-
}}}}, false},
83+
}, model.ClusterConfig{
84+
Namespaces: []model.NamespaceConfig{{
85+
Name: common.DefaultNamespace,
86+
ReplicationFactor: 1,
87+
InitialShardCount: 2,
88+
}},
89+
Servers: []model.ServerAddress{{
90+
Public: "public:1234",
91+
Internal: "internal:5678",
92+
},
93+
},
94+
}, false},
8995
{[]string{"-i=0.0.0.0:1234"}, coordinator.Config{
9096
InternalServiceAddr: "0.0.0.0:1234",
9197
MetricsServiceAddr: "localhost:8080",
9298
MetadataProviderImpl: coordinator.File,
93-
ClusterConfig: model.ClusterConfig{
94-
Namespaces: []model.NamespaceConfig{{
95-
Name: common.DefaultNamespace,
96-
ReplicationFactor: 1,
97-
InitialShardCount: 2,
98-
}},
99-
Servers: []model.ServerAddress{{
100-
Public: "public:1234",
101-
Internal: "internal:5678",
102-
}}}}, false},
99+
}, model.ClusterConfig{
100+
Namespaces: []model.NamespaceConfig{{
101+
Name: common.DefaultNamespace,
102+
ReplicationFactor: 1,
103+
InitialShardCount: 2,
104+
}},
105+
Servers: []model.ServerAddress{{
106+
Public: "public:1234",
107+
Internal: "internal:5678",
108+
},
109+
},
110+
}, false},
103111
{[]string{"-m=localhost:1234"}, coordinator.Config{
104112
InternalServiceAddr: "localhost:6649",
105113
MetricsServiceAddr: "localhost:1234",
106114
MetadataProviderImpl: coordinator.File,
107-
ClusterConfig: model.ClusterConfig{
108-
Namespaces: []model.NamespaceConfig{{
109-
Name: common.DefaultNamespace,
110-
ReplicationFactor: 1,
111-
InitialShardCount: 2,
112-
}},
113-
Servers: []model.ServerAddress{{
114-
Public: "public:1234",
115-
Internal: "internal:5678",
116-
}}}}, false},
115+
}, model.ClusterConfig{
116+
Namespaces: []model.NamespaceConfig{{
117+
Name: common.DefaultNamespace,
118+
ReplicationFactor: 1,
119+
InitialShardCount: 2,
120+
}},
121+
Servers: []model.ServerAddress{{
122+
Public: "public:1234",
123+
Internal: "internal:5678",
124+
},
125+
},
126+
}, false},
117127
{[]string{"-f=" + name}, coordinator.Config{
118128
InternalServiceAddr: "localhost:6649",
119129
MetricsServiceAddr: "localhost:8080",
120130
MetadataProviderImpl: coordinator.File,
121-
ClusterConfig: model.ClusterConfig{
122-
Namespaces: []model.NamespaceConfig{{
123-
Name: common.DefaultNamespace,
124-
ReplicationFactor: 1,
125-
InitialShardCount: 2,
126-
}},
127-
Servers: []model.ServerAddress{{
128-
Public: "public:1234",
129-
Internal: "internal:5678",
130-
}}}}, false},
131+
}, model.ClusterConfig{
132+
Namespaces: []model.NamespaceConfig{{
133+
Name: common.DefaultNamespace,
134+
ReplicationFactor: 1,
135+
InitialShardCount: 2,
136+
}},
137+
Servers: []model.ServerAddress{{
138+
Public: "public:1234",
139+
Internal: "internal:5678",
140+
},
141+
},
142+
}, false},
131143
{[]string{"-f=invalid.yaml"}, coordinator.Config{
132144
InternalServiceAddr: "localhost:6649",
133145
MetricsServiceAddr: "localhost:8080",
134-
ClusterConfig: model.ClusterConfig{}}, true},
146+
}, model.ClusterConfig{}, true},
135147
} {
136148
t.Run(strings.Join(test.args, "_"), func(t *testing.T) {
137149
conf = coordinator.NewConfig()
@@ -140,6 +152,11 @@ func TestCmd(t *testing.T) {
140152
Cmd.SetArgs(test.args)
141153
Cmd.Run = func(cmd *cobra.Command, args []string) {
142154
assert.Equal(t, test.expectedConf, conf)
155+
156+
conf.ClusterConfigProvider = loadClusterConfig
157+
clusterConf, err := conf.ClusterConfigProvider()
158+
assert.NoError(t, err)
159+
assert.Equal(t, test.expectedClusterConf, clusterConf)
143160
}
144161
err = Cmd.Execute()
145162
assert.Equal(t, test.isErr, err != nil)

coordinator/coordinator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"oxia/coordinator/impl"
2525
"oxia/coordinator/model"
2626
"oxia/kubernetes"
27+
"time"
2728
)
2829

2930
type Config struct {
@@ -33,7 +34,8 @@ type Config struct {
3334
K8SMetadataNamespace string
3435
K8SMetadataConfigMapName string
3536
FileMetadataPath string
36-
ClusterConfig model.ClusterConfig
37+
ClusterConfigProvider func() (model.ClusterConfig, error)
38+
ClusterConfigRefreshTime time.Duration
3739
}
3840

3941
type MetadataProviderImpl string
@@ -101,7 +103,7 @@ func New(config Config) (*Coordinator, error) {
101103
rpcClient := impl.NewRpcProvider(s.clientPool)
102104

103105
var err error
104-
if s.coordinator, err = impl.NewCoordinator(metadataProvider, config.ClusterConfig, rpcClient); err != nil {
106+
if s.coordinator, err = impl.NewCoordinator(metadataProvider, config.ClusterConfigProvider, config.ClusterConfigRefreshTime, rpcClient); err != nil {
105107
return nil, err
106108
}
107109

coordinator/impl/cluster_updates.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ func getServers(servers []model.ServerAddress, startIdx uint32, count uint32) []
3030

3131
func applyClusterChanges(config *model.ClusterConfig, currentStatus *model.ClusterStatus) (
3232
newStatus *model.ClusterStatus,
33-
shardsToAdd []int64,
33+
shardsToAdd map[int64]string,
3434
shardsToDelete []int64) {
3535

36-
shardsToAdd = make([]int64, 0)
37-
shardsToDelete = make([]int64, 0)
36+
shardsToAdd = map[int64]string{}
37+
shardsToDelete = []int64{}
3838

3939
newStatus = &model.ClusterStatus{
4040
Namespaces: map[string]model.NamespaceStatus{},
@@ -65,7 +65,7 @@ func applyClusterChanges(config *model.ClusterConfig, currentStatus *model.Clust
6565

6666
nss.Shards[shard.Id] = shardMetadata
6767
newStatus.ServerIdx = (newStatus.ServerIdx + nc.ReplicationFactor) % uint32(len(config.Servers))
68-
shardsToAdd = append(shardsToAdd, shard.Id)
68+
shardsToAdd[shard.Id] = nc.Name
6969
}
7070
newStatus.Namespaces[nc.Name] = nss
7171

coordinator/impl/cluster_updates_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ func TestClientUpdates_ClusterInit(t *testing.T) {
9191
}, newStatus)
9292

9393
assert.Equal(t, []int64{}, shardsToRemove)
94-
assert.Equal(t, []int64{0, 1, 2}, shardsAdded)
94+
assert.Equal(t, map[int64]string{
95+
0: "ns-1",
96+
1: "ns-2",
97+
2: "ns-2"}, shardsAdded)
9598
}
9699

97100
func TestClientUpdates_NamespaceAdded(t *testing.T) {
@@ -173,7 +176,9 @@ func TestClientUpdates_NamespaceAdded(t *testing.T) {
173176
}, newStatus)
174177

175178
assert.Equal(t, []int64{}, shardsToRemove)
176-
assert.Equal(t, []int64{1, 2}, shardsAdded)
179+
assert.Equal(t, map[int64]string{
180+
1: "ns-2",
181+
2: "ns-2"}, shardsAdded)
177182
}
178183

179184
func TestClientUpdates_NamespaceRemoved(t *testing.T) {
@@ -279,5 +284,5 @@ func TestClientUpdates_NamespaceRemoved(t *testing.T) {
279284

280285
sort.Slice(shardsToRemove, func(i, j int) bool { return shardsToRemove[i] < shardsToRemove[j] })
281286
assert.Equal(t, []int64{1, 2}, shardsToRemove)
282-
assert.Equal(t, []int64{}, shardsAdded)
287+
assert.Equal(t, map[int64]string{}, shardsAdded)
283288
}

0 commit comments

Comments
 (0)