Skip to content

Commit e412942

Browse files
mcandeiayaron2
andauthored
feat: Pluggable Components autoregistration + gRPC Reflection (dapr#5262)
* Components autoregistration Signed-off-by: Marcos Candeia <[email protected]> * Add more discovery tests Signed-off-by: Marcos Candeia <[email protected]> * Update pluggable apps to Support reflection apis Signed-off-by: Marcos Candeia <[email protected]> * Run go mod tidy inside pluggable components Signed-off-by: Marcos Candeia <[email protected]> * Using context when discovering components Signed-off-by: Marcos Candeia <[email protected]> * Remove fmt.sprintf in favor of string concat Signed-off-by: Marcos Candeia <[email protected]> * Resolve go sum conflicts Signed-off-by: Marcos Candeia <[email protected]> * Use /tmp/dapr-components-sockets instead of root folder Signed-off-by: Marcos Candeia <[email protected]> * Rename DAPR_COMPONENTS_SOCKET_FOLDER Signed-off-by: Marcos Candeia <[email protected]> * Use proto ref from generated go files Signed-off-by: Marcos Candeia <[email protected]> * Allow components to multiplexing socket Signed-off-by: Marcos Candeia <[email protected]> Signed-off-by: Marcos Candeia <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent 551b058 commit e412942

File tree

24 files changed

+481
-138
lines changed

24 files changed

+481
-138
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ require (
250250
github.com/jcmturner/gofork v1.0.0 // indirect
251251
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
252252
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
253+
github.com/jhump/protoreflect v1.13.0
253254
github.com/jinzhu/copier v0.3.5 // indirect
254255
github.com/jmespath/go-jmespath v0.4.0 // indirect
255256
github.com/josharian/intern v1.0.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,13 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
12461246
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
12471247
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
12481248
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
1249+
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
1250+
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
1251+
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
12491252
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
1253+
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
1254+
github.com/jhump/protoreflect v1.13.0 h1:zrrZqa7JAc2YGgPSzZZkmUXJ5G6NRPdxOg/9t7ISImA=
1255+
github.com/jhump/protoreflect v1.13.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
12501256
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
12511257
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
12521258
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=

pkg/components/bindings/input_pluggable.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type grpcInputBinding struct {
3737

3838
// Init initializes the grpc inputbinding passing out the metadata to the grpc component.
3939
func (b *grpcInputBinding) Init(metadata bindings.Metadata) error {
40-
if err := b.Dial(); err != nil {
40+
if err := b.Dial(metadata.Name); err != nil {
4141
return err
4242
}
4343

@@ -141,8 +141,15 @@ func NewGRPCInputBinding(l logger.Logger, socket string) *grpcInputBinding {
141141
}
142142

143143
// newGRPCInputBinding creates a new input binding for the given pluggable component.
144-
func newGRPCInputBinding(socket string) func(l logger.Logger) bindings.InputBinding {
144+
func newGRPCInputBinding(dialer pluggable.GRPCConnectionDialer) func(l logger.Logger) bindings.InputBinding {
145145
return func(l logger.Logger) bindings.InputBinding {
146-
return inputFromConnector(l, pluggable.NewGRPCConnector(socket, proto.NewInputBindingClient))
146+
return inputFromConnector(l, pluggable.NewGRPCConnectorWithDialer(dialer, proto.NewInputBindingClient))
147147
}
148148
}
149+
150+
func init() {
151+
//nolint:nosnakecase
152+
pluggable.AddServiceDiscoveryCallback(proto.InputBinding_ServiceDesc.ServiceName, func(name string, dialer pluggable.GRPCConnectionDialer) {
153+
DefaultRegistry.RegisterInputBinding(newGRPCInputBinding(dialer), name)
154+
})
155+
}

pkg/components/bindings/output_pluggable.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type grpcOutputBinding struct {
3131

3232
// Init initializes the grpc outputbinding passing out the metadata to the grpc component.
3333
func (b *grpcOutputBinding) Init(metadata bindings.Metadata) error {
34-
if err := b.Dial(); err != nil {
34+
if err := b.Dial(metadata.Name); err != nil {
3535
return err
3636
}
3737

@@ -103,8 +103,15 @@ func NewGRPCOutputBinding(l logger.Logger, socket string) *grpcOutputBinding {
103103
}
104104

105105
// newGRPCOutputBinding creates a new output binding for the given pluggable component.
106-
func newGRPCOutputBinding(socket string) func(l logger.Logger) bindings.OutputBinding {
106+
func newGRPCOutputBinding(dialer pluggable.GRPCConnectionDialer) func(l logger.Logger) bindings.OutputBinding {
107107
return func(l logger.Logger) bindings.OutputBinding {
108-
return outputFromConnector(l, pluggable.NewGRPCConnector(socket, proto.NewOutputBindingClient))
108+
return outputFromConnector(l, pluggable.NewGRPCConnectorWithDialer(dialer, proto.NewOutputBindingClient))
109109
}
110110
}
111+
112+
func init() {
113+
//nolint:nosnakecase
114+
pluggable.AddServiceDiscoveryCallback(proto.OutputBinding_ServiceDesc.ServiceName, func(name string, dialer pluggable.GRPCConnectionDialer) {
115+
DefaultRegistry.RegisterOutputBinding(newGRPCOutputBinding(dialer), name)
116+
})
117+
}

pkg/components/bindings/registry.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020

2121
"github.com/dapr/components-contrib/bindings"
2222
"github.com/dapr/dapr/pkg/components"
23-
"github.com/dapr/dapr/utils"
2423
"github.com/dapr/kit/logger"
2524
)
2625

@@ -98,9 +97,6 @@ func (b *Registry) getInputBinding(name, version string) (func() bindings.InputB
9897
}
9998
}
10099

101-
if socket := components.SocketPathForPluggableComponent(name, version); utils.SocketExists(socket) {
102-
return b.wrapInputBindingFn(newGRPCInputBinding(socket)), true
103-
}
104100
return nil, false
105101
}
106102

@@ -118,9 +114,6 @@ func (b *Registry) getOutputBinding(name, version string) (func() bindings.Outpu
118114
}
119115
}
120116

121-
if socket := components.SocketPathForPluggableComponent(name, version); utils.SocketExists(socket) {
122-
return b.wrapOutputBindingFn(newGRPCOutputBinding(socket)), true
123-
}
124117
return nil, false
125118
}
126119

pkg/components/pluggable.go

Lines changed: 0 additions & 39 deletions
This file was deleted.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package pluggable
15+
16+
import (
17+
"context"
18+
"os"
19+
"path/filepath"
20+
21+
"github.com/dapr/dapr/utils"
22+
"github.com/dapr/kit/logger"
23+
24+
"github.com/pkg/errors"
25+
26+
"github.com/jhump/protoreflect/grpcreflect"
27+
28+
"google.golang.org/grpc"
29+
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
30+
)
31+
32+
var (
33+
discoveryLog = logger.NewLogger("pluggable-components-discovery")
34+
onServiceDiscovered map[string]func(name string, dialer GRPCConnectionDialer)
35+
)
36+
37+
func init() {
38+
onServiceDiscovered = make(map[string]func(name string, dialer GRPCConnectionDialer))
39+
}
40+
41+
// AddServiceDiscoveryCallback adds a callback function that should be called when the given service was discovered.
42+
func AddServiceDiscoveryCallback(serviceName string, callbackFunc func(name string, dialer GRPCConnectionDialer)) {
43+
onServiceDiscovered[serviceName] = callbackFunc
44+
}
45+
46+
// removeExt removes file extension
47+
func removeExt(fileName string) string {
48+
return fileName[:len(fileName)-len(filepath.Ext(fileName))]
49+
}
50+
51+
const (
52+
SocketFolderEnvVar = "DAPR_COMPONENTS_SOCKETS_FOLDER"
53+
defaultSocketFolder = "/tmp/dapr-components-sockets"
54+
)
55+
56+
// GetSocketFolderPath returns the shared unix domain socket folder path
57+
func GetSocketFolderPath() string {
58+
return utils.GetEnvOrElse(SocketFolderEnvVar, defaultSocketFolder)
59+
}
60+
61+
type service struct {
62+
// protoRef is the proto service name
63+
protoRef string
64+
// componentName is the component name that implements such service.
65+
componentName string
66+
// dialer is the used grpc connectiondialer.
67+
dialer GRPCConnectionDialer
68+
}
69+
70+
type reflectServiceClient interface {
71+
ListServices() ([]string, error)
72+
}
73+
74+
// serviceDiscovery returns all available discovered pluggable components services.
75+
// uses gRPC reflection package to list implemented services.
76+
func serviceDiscovery(reflectClientFactory func(string) (reflectServiceClient, func(), error)) ([]service, error) {
77+
services := []service{}
78+
componentsSocketPath := GetSocketFolderPath()
79+
_, err := os.Stat(componentsSocketPath)
80+
81+
if os.IsNotExist(err) { // not exists is the same as empty.
82+
return services, nil
83+
}
84+
85+
log.Debugf("loading pluggable components under path %s", componentsSocketPath)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
files, err := os.ReadDir(componentsSocketPath)
91+
if err != nil {
92+
return nil, errors.Wrap(err, "could not list pluggable components unix sockets")
93+
}
94+
95+
for _, dirEntry := range files {
96+
if dirEntry.IsDir() { // skip dirs
97+
continue
98+
}
99+
100+
f, err := dirEntry.Info()
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
socket := filepath.Join(componentsSocketPath, f.Name())
106+
if !utils.IsSocket(f) {
107+
discoveryLog.Warnf("could not use socket for file %s", socket)
108+
continue
109+
}
110+
111+
refctClient, cleanup, err := reflectClientFactory(socket)
112+
if err != nil {
113+
return nil, err
114+
}
115+
defer cleanup()
116+
117+
serviceList, err := refctClient.ListServices()
118+
if err != nil {
119+
return nil, errors.Wrap(err, "unable to list services")
120+
}
121+
dialer := socketDialer(socket)
122+
123+
componentName := removeExt(f.Name())
124+
for _, svc := range serviceList {
125+
services = append(services, service{
126+
componentName: componentName,
127+
protoRef: svc,
128+
dialer: dialer,
129+
})
130+
}
131+
}
132+
log.Debugf("found %d pluggable component services", len(services)-1) // reflection api doesn't count.
133+
return services, nil
134+
}
135+
136+
// callback invoke callback function for each given service
137+
func callback(services []service) {
138+
for _, service := range services {
139+
callback, ok := onServiceDiscovered[service.protoRef]
140+
if !ok { // ignoring unknown service
141+
continue
142+
}
143+
callback(service.componentName, service.dialer)
144+
log.Infof("pluggable component '%s' was successfully registered for '%s'", service.componentName, service.protoRef)
145+
}
146+
}
147+
148+
// Discover discover the pluggable components and callback the service discovery with the given component name and grpc dialer.
149+
func Discover(ctx context.Context) error {
150+
services, err := serviceDiscovery(func(socket string) (reflectServiceClient, func(), error) {
151+
conn, err := SocketDial(
152+
ctx,
153+
socket,
154+
grpc.WithBlock(),
155+
)
156+
if err != nil {
157+
return nil, nil, err
158+
}
159+
return grpcreflect.NewClient(ctx, reflectpb.NewServerReflectionClient(conn)), func() {
160+
conn.Close()
161+
}, nil
162+
})
163+
if err != nil {
164+
return err
165+
}
166+
167+
callback(services)
168+
return nil
169+
}

0 commit comments

Comments
 (0)