Skip to content

Commit ed7f745

Browse files
authored
feat: add allow auto update schema cmd (streamnative#567)
Signed-off-by: Zixuan Liu <[email protected]>
1 parent 23a3cf4 commit ed7f745

File tree

8 files changed

+264
-51
lines changed

8 files changed

+264
-51
lines changed

go.mod

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/ghodss/yaml v1.0.0
1212
github.com/go-sql-driver/mysql v1.5.0 // indirect
1313
github.com/gogo/protobuf v1.3.2 // indirect
14-
github.com/golang/protobuf v1.4.3
14+
github.com/golang/protobuf v1.5.2
1515
github.com/gorilla/mux v1.7.4 // indirect
1616
github.com/imdario/mergo v0.3.8
1717
github.com/kr/pretty v0.2.0 // indirect
@@ -21,17 +21,19 @@ require (
2121
github.com/mattn/go-colorable v0.1.2 // indirect
2222
github.com/mattn/go-runewidth v0.0.4 // indirect
2323
github.com/olekukonko/tablewriter v0.0.1
24+
github.com/onsi/gomega v1.18.0
2425
github.com/pkg/errors v0.9.1
2526
github.com/sirupsen/logrus v1.4.2 // indirect
2627
github.com/spf13/cobra v0.0.5
2728
github.com/spf13/pflag v1.0.5
2829
github.com/stretchr/testify v1.5.1
2930
github.com/testcontainers/testcontainers-go v0.0.10
30-
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d // indirect
3131
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93
3232
google.golang.org/appengine v1.6.7 // indirect
3333
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
34-
gopkg.in/yaml.v2 v2.3.0
34+
gopkg.in/yaml.v2 v2.4.0
3535
)
3636

3737
replace github.com/apache/pulsar-client-go/oauth2 => github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211006154457-742f1b107403
38+
39+
replace golang.org/x/sys => golang.org/x/sys v0.0.0-20201119102817-f84b799fce68

go.sum

Lines changed: 27 additions & 48 deletions
Large diffs are not rendered by default.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"strconv"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
25+
)
26+
27+
func GetIsAllowAutoUpdateSchemaCmd(vc *cmdutils.VerbCmd) {
28+
var desc cmdutils.LongDescription
29+
desc.CommandUsedFor = "Get the whether to allow auto update schema on a namespace"
30+
desc.CommandPermission = "This command requires tenant admin permissions"
31+
32+
var examples []cmdutils.Example
33+
examples = append(examples, cmdutils.Example{
34+
Desc: desc.CommandUsedFor,
35+
Command: "pulsarctl namespaces get-is-allow-auto-update-schema (namespace-name)",
36+
})
37+
desc.CommandExamples = examples
38+
39+
vc.SetDescription(
40+
"get-is-allow-auto-update-schema",
41+
desc.CommandUsedFor,
42+
desc.ToString(),
43+
desc.ExampleToString())
44+
45+
vc.SetRunFuncWithNameArg(func() error {
46+
return doGetIsAllowAutoUpdateSchema(vc)
47+
}, "the namespace name is not specified or the namespace name is specified more than one")
48+
}
49+
50+
func doGetIsAllowAutoUpdateSchema(vc *cmdutils.VerbCmd) error {
51+
ns, err := utils.GetNamespaceName(vc.NameArg)
52+
if err != nil {
53+
return err
54+
}
55+
56+
admin := cmdutils.NewPulsarClient()
57+
result, err := admin.Namespaces().GetIsAllowAutoUpdateSchema(*ns)
58+
if err == nil {
59+
vc.Command.Println(strconv.FormatBool(result))
60+
}
61+
return err
62+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"fmt"
22+
"testing"
23+
24+
"github.com/onsi/gomega"
25+
"github.com/streamnative/pulsarctl/pkg/test"
26+
)
27+
28+
func TestIsAllowAutoUpdateSchemaCmd(t *testing.T) {
29+
g := gomega.NewWithT(t)
30+
31+
ns := "public/test-is-allow-auto-update-schema-" + test.RandomSuffix()
32+
createArgs := []string{"create", ns}
33+
g.Eventually(func(g gomega.Gomega) {
34+
_, execErr, _, _ := TestNamespaceCommands(createNs, createArgs)
35+
g.Expect(execErr).Should(gomega.BeNil())
36+
}).Should(gomega.Succeed())
37+
38+
setArgs := []string{"set-is-allow-auto-update-schema", "--disable", ns}
39+
g.Eventually(func(g gomega.Gomega) {
40+
out, execErr, _, _ := TestNamespaceCommands(SetIsAllowAutoUpdateSchemaCmd, setArgs)
41+
g.Expect(execErr).Should(gomega.BeNil())
42+
g.Expect(out.String()).Should(gomega.Equal(
43+
fmt.Sprintf("Successfully disable auto update schema on a namespace %s\n", ns)))
44+
}).Should(gomega.Succeed())
45+
46+
getArgs := []string{"get-is-allow-auto-update-schema", ns}
47+
g.Eventually(func(g gomega.Gomega) {
48+
out, execErr, _, _ := TestNamespaceCommands(GetIsAllowAutoUpdateSchemaCmd, getArgs)
49+
g.Expect(execErr).Should(gomega.BeNil())
50+
g.Expect(out.String()).Should(gomega.Equal("false\n"))
51+
}).Should(gomega.Succeed())
52+
53+
setArgs = []string{"set-is-allow-auto-update-schema", "--enable", ns}
54+
g.Eventually(func(g gomega.Gomega) {
55+
out, execErr, _, _ := TestNamespaceCommands(SetIsAllowAutoUpdateSchemaCmd, setArgs)
56+
g.Expect(execErr).Should(gomega.BeNil())
57+
g.Expect(out.String()).Should(gomega.Equal(
58+
fmt.Sprintf("Successfully enable auto update schema on a namespace %s\n", ns)))
59+
}).Should(gomega.Succeed())
60+
61+
g.Eventually(func(g gomega.Gomega) {
62+
out, execErr, _, _ := TestNamespaceCommands(GetIsAllowAutoUpdateSchemaCmd, getArgs)
63+
g.Expect(execErr).Should(gomega.BeNil())
64+
g.Expect(out.String()).Should(gomega.Equal("true\n"))
65+
}).Should(gomega.Succeed())
66+
}

pkg/ctl/namespace/namespace.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
9292
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetSubscriptionAuthModeCmd)
9393
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetPublishRateCmd)
9494
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetPublishRateCmd)
95+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetIsAllowAutoUpdateSchemaCmd)
96+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetIsAllowAutoUpdateSchemaCmd)
9597
return resourceCmd
9698
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"github.com/pkg/errors"
22+
"github.com/spf13/pflag"
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
25+
)
26+
27+
func SetIsAllowAutoUpdateSchemaCmd(vc *cmdutils.VerbCmd) {
28+
var desc cmdutils.LongDescription
29+
desc.CommandUsedFor = "Set the whether to allow auto update schema on a namespace"
30+
desc.CommandPermission = "This command requires tenant admin permissions"
31+
32+
var examples []cmdutils.Example
33+
examples = append(examples, cmdutils.Example{
34+
Desc: "Enable automatically update schema on a namespace",
35+
Command: "pulsarctl namespaces set-is-allow-auto-update-schema --enable (namespace-name)",
36+
})
37+
examples = append(examples, cmdutils.Example{
38+
Desc: "Disable automatically update update schema on a namespace",
39+
Command: "pulsarctl namespaces set-is-allow-auto-update-schema --disable (namespace-name)",
40+
})
41+
desc.CommandExamples = examples
42+
43+
vc.SetDescription(
44+
"set-is-allow-auto-update-schema",
45+
desc.CommandUsedFor,
46+
desc.ToString(),
47+
desc.ExampleToString())
48+
49+
var (
50+
enable bool
51+
disable bool
52+
)
53+
vc.FlagSetGroup.InFlagSet("IsAllowAutoUpdateSchema", func(set *pflag.FlagSet) {
54+
set.BoolVar(&enable, "enable", false, "enable automatically update schema")
55+
set.BoolVar(&disable, "disable", false, "disable automatically update schema")
56+
})
57+
vc.EnableOutputFlagSet()
58+
59+
vc.SetRunFuncWithNameArg(func() error {
60+
if enable == disable {
61+
return errors.New("specify only one of --enable or --disable")
62+
}
63+
return doSetIsAllowAutoUpdateSchema(vc, enable)
64+
}, "the namespace name is not specified or the namespace name is specified more than one")
65+
}
66+
67+
func doSetIsAllowAutoUpdateSchema(vc *cmdutils.VerbCmd, isAllowUpdateSchema bool) error {
68+
ns, err := utils.GetNamespaceName(vc.NameArg)
69+
if err != nil {
70+
return err
71+
}
72+
73+
admin := cmdutils.NewPulsarClient()
74+
err = admin.Namespaces().SetIsAllowAutoUpdateSchema(*ns, isAllowUpdateSchema)
75+
if err == nil {
76+
action := "enable"
77+
if !isAllowUpdateSchema {
78+
action = "disable"
79+
}
80+
vc.Command.Printf("Successfully %s auto update schema on a namespace %s\n", action, ns.String())
81+
}
82+
return err
83+
}

pkg/pulsar/namespace.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,12 @@ type Namespaces interface {
259259

260260
// GetPublishRate gets the maximum rate or number of messages that producer can publish to topics in the namespace
261261
GetPublishRate(namespace utils.NameSpaceName) (utils.PublishRate, error)
262+
263+
// SetIsAllowAutoUpdateSchema sets whether to allow auto update schema on a namespace
264+
SetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName, isAllowAutoUpdateSchema bool) error
265+
266+
// GetIsAllowAutoUpdateSchema gets whether to allow auto update schema on a namespace
267+
GetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName) (bool, error)
262268
}
263269

264270
type namespaces struct {
@@ -830,3 +836,15 @@ func (n *namespaces) GetPublishRate(namespace utils.NameSpaceName) (utils.Publis
830836
err := n.pulsar.Client.Get(endpoint, &pubRate)
831837
return pubRate, err
832838
}
839+
840+
func (n *namespaces) SetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName, isAllowAutoUpdateSchema bool) error {
841+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "isAllowAutoUpdateSchema")
842+
return n.pulsar.Client.Post(endpoint, &isAllowAutoUpdateSchema)
843+
}
844+
845+
func (n *namespaces) GetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName) (bool, error) {
846+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "isAllowAutoUpdateSchema")
847+
var result bool
848+
err := n.pulsar.Client.Get(endpoint, &result)
849+
return result, err
850+
}

pkg/pulsar/utils/policies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Policies struct {
5454
SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schema_auto_update_compatibility_strategy"`
5555
AuthPolicies common.AuthPolicies `json:"auth_policies"`
5656
SubscriptionAuthMode SubscriptionAuthMode `json:"subscription_auth_mode"`
57+
IsAllowAutoUpdateSchema *bool `json:"is_allow_auto_update_schema"`
5758
}
5859

5960
func NewDefaultPolicies() *Policies {

0 commit comments

Comments
 (0)