Skip to content

Commit aaddbd9

Browse files
authored
Merge pull request kubernetes-sigs#512 from kubernetes-sigs/master
🏃 ff release-0.2 branch to master HEAD
2 parents aeaf98d + 2495fdd commit aaddbd9

File tree

7 files changed

+108
-26
lines changed

7 files changed

+108
-26
lines changed

Gopkg.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/client/config/config.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"k8s.io/client-go/rest"
2727
"k8s.io/client-go/tools/clientcmd"
28+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
2829
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2930
)
3031

@@ -61,7 +62,27 @@ func init() {
6162
//
6263
// * $HOME/.kube/config if exists
6364
func GetConfig() (*rest.Config, error) {
64-
cfg, err := loadConfig()
65+
return GetConfigWithContext("")
66+
}
67+
68+
// GetConfigWithContext creates a *rest.Config for talking to a Kubernetes API server with a specific context.
69+
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
70+
// in cluster and use the cluster provided kubeconfig.
71+
//
72+
// It also applies saner defaults for QPS and burst based on the Kubernetes
73+
// controller manager defaults (20 QPS, 30 burst)
74+
//
75+
// Config precedence
76+
//
77+
// * --kubeconfig flag pointing at a file
78+
//
79+
// * KUBECONFIG environment variable pointing at a file
80+
//
81+
// * In-cluster config if running in cluster
82+
//
83+
// * $HOME/.kube/config if exists
84+
func GetConfigWithContext(context string) (*rest.Config, error) {
85+
cfg, err := loadConfig(context)
6586
if err != nil {
6687
return nil, err
6788
}
@@ -75,30 +96,42 @@ func GetConfig() (*rest.Config, error) {
7596
}
7697

7798
// loadConfig loads a REST Config as per the rules specified in GetConfig
78-
func loadConfig() (*rest.Config, error) {
99+
func loadConfig(context string) (*rest.Config, error) {
100+
79101
// If a flag is specified with the config location, use that
80102
if len(kubeconfig) > 0 {
81-
return clientcmd.BuildConfigFromFlags(apiServerURL, kubeconfig)
103+
return loadConfigWithContext(apiServerURL, kubeconfig, context)
82104
}
83-
// If an env variable is specified with the config locaiton, use that
105+
// If an env variable is specified with the config location, use that
84106
if len(os.Getenv("KUBECONFIG")) > 0 {
85-
return clientcmd.BuildConfigFromFlags(apiServerURL, os.Getenv("KUBECONFIG"))
107+
return loadConfigWithContext(apiServerURL, os.Getenv("KUBECONFIG"), context)
86108
}
87109
// If no explicit location, try the in-cluster config
88110
if c, err := rest.InClusterConfig(); err == nil {
89111
return c, nil
90112
}
91113
// If no in-cluster config, try the default location in the user's home directory
92114
if usr, err := user.Current(); err == nil {
93-
if c, err := clientcmd.BuildConfigFromFlags(
94-
"", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
115+
if c, err := loadConfigWithContext(apiServerURL, filepath.Join(usr.HomeDir, ".kube", "config"),
116+
context); err == nil {
95117
return c, nil
96118
}
97119
}
98120

99121
return nil, fmt.Errorf("could not locate a kubeconfig")
100122
}
101123

124+
func loadConfigWithContext(apiServerURL, kubeconfig, context string) (*rest.Config, error) {
125+
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
126+
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig},
127+
&clientcmd.ConfigOverrides{
128+
ClusterInfo: clientcmdapi.Cluster{
129+
Server: apiServerURL,
130+
},
131+
CurrentContext: context,
132+
}).ClientConfig()
133+
}
134+
102135
// GetConfigOrDie creates a *rest.Config for talking to a Kubernetes apiserver.
103136
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
104137
// in cluster and use the cluster provided kubeconfig.

pkg/manager/internal.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,10 @@ type controllerManager struct {
9090
// metricsListener is used to serve prometheus metrics
9191
metricsListener net.Listener
9292

93-
mu sync.Mutex
94-
started bool
95-
errChan chan error
93+
mu sync.Mutex
94+
started bool
95+
startedLeader bool
96+
errChan chan error
9697

9798
// internalStop is the stop channel *actually* used by everything involved
9899
// with the manager as a stop channel, so that we can pass a stop channel
@@ -134,14 +135,18 @@ func (cm *controllerManager) Add(r Runnable) error {
134135
return err
135136
}
136137

138+
var shouldStart bool
139+
137140
// Add the runnable to the leader election or the non-leaderelection list
138141
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
142+
shouldStart = cm.started
139143
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
140144
} else {
145+
shouldStart = cm.startedLeader
141146
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
142147
}
143148

144-
if cm.started {
149+
if shouldStart {
145150
// If already started, start the controller
146151
go func() {
147152
cm.errChan <- r.Start(cm.internalStop)
@@ -225,17 +230,19 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server {
225230
}
226231

227232
func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
233+
var metricsPath = "/metrics"
228234
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
229235
ErrorHandling: promhttp.HTTPErrorOnError,
230236
})
231237
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
232238
mux := http.NewServeMux()
233-
mux.Handle("/metrics", handler)
239+
mux.Handle(metricsPath, handler)
234240
server := http.Server{
235241
Handler: mux,
236242
}
237243
// Run the server
238244
go func() {
245+
log.Info("starting metrics server", "path", metricsPath)
239246
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
240247
cm.errChan <- err
241248
}
@@ -314,6 +321,8 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
314321
cm.errChan <- ctrl.Start(cm.internalStop)
315322
}()
316323
}
324+
325+
cm.startedLeader = true
317326
}
318327

319328
func (cm *controllerManager) waitForCache() {

pkg/manager/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ type Options struct {
135135
Namespace string
136136

137137
// MetricsBindAddress is the TCP address that the controller should bind to
138-
// for serving prometheus metrics
138+
// for serving prometheus metrics.
139+
// It can be set to "0" to disable the metrics serving.
139140
MetricsBindAddress string
140141

141142
// Port is the port that the webhook server serves at.

pkg/metrics/listener.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ import (
2121
"net"
2222
)
2323

24-
// DefaultBindAddress sets the default bind address for the metrics
25-
// listener
26-
// The metrics is off by default.
27-
// TODO: Flip the default by changing DefaultBindAddress back to ":8080" in the v0.2.0.
28-
var DefaultBindAddress = "0"
24+
// DefaultBindAddress sets the default bind address for the metrics listener
25+
// The metrics is on by default.
26+
var DefaultBindAddress = ":8080"
2927

3028
// NewListener creates a new TCP listener bound to the given address.
3129
func NewListener(addr string) (net.Listener, error) {
@@ -39,9 +37,12 @@ func NewListener(addr string) (net.Listener, error) {
3937
return nil, nil
4038
}
4139

40+
log.Info("metrics server is starting to listen", "addr", addr)
4241
ln, err := net.Listen("tcp", addr)
4342
if err != nil {
44-
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
43+
er := fmt.Errorf("error listening on %s: %v", addr, err)
44+
log.Error(er, "metrics server failed to listen. You may want to disable the metrics server or use another port if it is due to conflicts")
45+
return nil, er
4546
}
4647
return ln, nil
4748
}

pkg/webhook/conversion/conversion.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error {
174174

175175
// getHub returns an instance of the Hub for passed-in object's group/kind.
176176
func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) {
177-
gvks := objectGVKs(wh.scheme, obj)
177+
gvks, err := objectGVKs(wh.scheme, obj)
178+
if err != nil {
179+
return nil, err
180+
}
178181
if len(gvks) == 0 {
179182
return nil, fmt.Errorf("error retrieving gvks for object : %v", obj)
180183
}
@@ -223,7 +226,10 @@ func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, e
223226
func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
224227
var hubs, spokes, nonSpokes []runtime.Object
225228

226-
gvks := objectGVKs(scheme, obj)
229+
gvks, err := objectGVKs(scheme, obj)
230+
if err != nil {
231+
return false, err
232+
}
227233
if len(gvks) == 0 {
228234
return false, fmt.Errorf("error retrieving gvks for object : %v", obj)
229235
}
@@ -273,18 +279,27 @@ func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
273279
}
274280

275281
// objectGVKs returns all (Group,Version,Kind) for the Group/Kind of given object.
276-
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) []schema.GroupVersionKind {
277-
var gvks []schema.GroupVersionKind
278-
279-
objGVK := obj.GetObjectKind().GroupVersionKind()
282+
func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) {
283+
// NB: we should not use `obj.GetObjectKind().GroupVersionKind()` to get the
284+
// GVK here, since it is parsed from apiVersion and kind fields and it may
285+
// return empty GVK if obj is an uninitialized object.
286+
objGVKs, _, err := scheme.ObjectKinds(obj)
287+
if err != nil {
288+
return nil, err
289+
}
290+
if len(objGVKs) != 1 {
291+
return nil, fmt.Errorf("expect to get only one GVK for %v", obj)
292+
}
293+
objGVK := objGVKs[0]
280294
knownTypes := scheme.AllKnownTypes()
281295

296+
var gvks []schema.GroupVersionKind
282297
for gvk := range knownTypes {
283298
if objGVK.GroupKind() == gvk.GroupKind() {
284299
gvks = append(gvks, gvk)
285300
}
286301
}
287-
return gvks
302+
return gvks, nil
288303
}
289304

290305
// PartialImplementationError represents an error due to partial conversion

pkg/webhook/conversion/conversion_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
appsv1beta1 "k8s.io/api/apps/v1beta1"
3030
apix "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3233
"k8s.io/apimachinery/pkg/runtime"
3334
kscheme "k8s.io/client-go/kubernetes/scheme"
3435

@@ -312,6 +313,27 @@ var _ = Describe("IsConvertible", func() {
312313
Expect(jobsv3.AddToScheme(scheme)).To(Succeed())
313314
})
314315

316+
It("should not error for uninitialized types", func() {
317+
obj := &jobsv2.ExternalJob{}
318+
319+
ok, err := IsConvertible(scheme, obj)
320+
Expect(err).NotTo(HaveOccurred())
321+
Expect(ok).To(BeTrue())
322+
})
323+
324+
It("should not error for unstructured types", func() {
325+
obj := &unstructured.Unstructured{
326+
Object: map[string]interface{}{
327+
"kind": "ExternalJob",
328+
"apiVersion": "jobs.testprojects.kb.io/v2",
329+
},
330+
}
331+
332+
ok, err := IsConvertible(scheme, obj)
333+
Expect(err).NotTo(HaveOccurred())
334+
Expect(ok).To(BeTrue())
335+
})
336+
315337
It("should return true for convertible types", func() {
316338
obj := &jobsv2.ExternalJob{
317339
TypeMeta: metav1.TypeMeta{

0 commit comments

Comments
 (0)