@@ -11,6 +11,7 @@ import (
11
11
12
12
clientset "github.com/openfaas/faas-netes/pkg/client/clientset/versioned"
13
13
informers "github.com/openfaas/faas-netes/pkg/client/informers/externalversions"
14
+ v1 "github.com/openfaas/faas-netes/pkg/client/informers/externalversions/openfaas/v1"
14
15
"github.com/openfaas/faas-netes/pkg/config"
15
16
"github.com/openfaas/faas-netes/pkg/controller"
16
17
"github.com/openfaas/faas-netes/pkg/handlers"
@@ -22,7 +23,10 @@ import (
22
23
"github.com/openfaas/faas-provider/logs"
23
24
"github.com/openfaas/faas-provider/proxy"
24
25
providertypes "github.com/openfaas/faas-provider/types"
26
+
25
27
kubeinformers "k8s.io/client-go/informers"
28
+ v1apps "k8s.io/client-go/informers/apps/v1"
29
+ v1core "k8s.io/client-go/informers/core/v1"
26
30
"k8s.io/client-go/kubernetes"
27
31
"k8s.io/client-go/tools/cache"
28
32
"k8s.io/client-go/tools/clientcmd"
@@ -62,6 +66,12 @@ func main() {
62
66
log .Fatalf ("Error building kubeconfig: %s" , err .Error ())
63
67
}
64
68
69
+ kubeconfigQPS := 100
70
+ kubeconfigBurst := 250
71
+
72
+ clientCmdConfig .QPS = float32 (kubeconfigQPS )
73
+ clientCmdConfig .Burst = kubeconfigBurst
74
+
65
75
kubeClient , err := kubernetes .NewForConfig (clientCmdConfig )
66
76
if err != nil {
67
77
log .Fatalf ("Error building Kubernetes clientset: %s" , err .Error ())
@@ -118,6 +128,7 @@ func main() {
118
128
// this is where we need to swap to the faasInformerFactory
119
129
profileInformerOpt := informers .WithNamespace (config .ProfilesNamespace )
120
130
profileInformerFactory := informers .NewSharedInformerFactoryWithOptions (faasClient , defaultResync , profileInformerOpt )
131
+
121
132
profileLister := profileInformerFactory .Openfaas ().V1 ().Profiles ().Lister ()
122
133
factory := k8s .NewFunctionFactory (kubeClient , deployConfig , profileLister )
123
134
@@ -131,59 +142,85 @@ func main() {
131
142
faasClient : faasClient ,
132
143
}
133
144
134
- if ! operator {
135
- log .Println ("Starting controller" )
136
- runController (setup )
137
- } else {
145
+ if operator {
138
146
log .Println ("Starting operator" )
139
147
runOperator (setup , config )
148
+ } else {
149
+ log .Println ("Starting controller" )
150
+ runController (setup )
140
151
}
141
152
}
142
153
143
- // runController runs the faas-netes imperative controller
144
- func runController (setup serverSetup ) {
145
- // pull out the required config and clients fromthe setup, this is largely a
146
- // leftover from refactoring the setup to a shared step and keeping the function
147
- // signature readable
148
- config := setup .config
149
- kubeClient := setup .kubeClient
150
- factory := setup .functionFactory
154
+ type customInformers struct {
155
+ EndpointsInformer v1core.EndpointsInformer
156
+ DeploymentInformer v1apps.DeploymentInformer
157
+ FunctionsInformer v1.FunctionInformer
158
+ }
159
+
160
+ func startInformers (setup serverSetup , stopCh <- chan struct {}, operator bool ) customInformers {
151
161
kubeInformerFactory := setup .kubeInformerFactory
152
162
faasInformerFactory := setup .faasInformerFactory
153
163
154
- // set up signals so we handle the first shutdown signal gracefully
155
- stopCh := signals .SetupSignalHandler ()
164
+ var functions v1.FunctionInformer
165
+ if operator {
166
+ // go faasInformerFactory.Start(stopCh)
167
+
168
+ functions = faasInformerFactory .Openfaas ().V1 ().Functions ()
169
+ go functions .Informer ().Run (stopCh )
170
+ if ok := cache .WaitForNamedCacheSync ("faas-netes:functions" , stopCh , functions .Informer ().HasSynced ); ! ok {
171
+ log .Fatalf ("failed to wait for cache to sync" )
172
+ }
173
+ }
156
174
157
- endpointsInformer := kubeInformerFactory .Core (). V1 (). Endpoints ( )
175
+ // go kubeInformerFactory.Start(stopCh )
158
176
159
- deploymentLister := kubeInformerFactory .Apps ().V1 ().
160
- Deployments ().Lister ()
177
+ deployments := kubeInformerFactory .Apps ().V1 ().Deployments ()
178
+ go deployments .Informer ().Run (stopCh )
179
+ if ok := cache .WaitForNamedCacheSync ("faas-netes:deployments" , stopCh , deployments .Informer ().HasSynced ); ! ok {
180
+ log .Fatalf ("failed to wait for cache to sync" )
181
+ }
161
182
162
- go faasInformerFactory .Start (stopCh )
163
- go kubeInformerFactory .Start (stopCh )
164
- go setup .profileInformerFactory .Start (stopCh )
183
+ endpoints := kubeInformerFactory .Core ().V1 ().Endpoints ()
184
+ go endpoints .Informer ().Run (stopCh )
185
+ if ok := cache .WaitForNamedCacheSync ("faas-netes:endpoints" , stopCh , endpoints .Informer ().HasSynced ); ! ok {
186
+ log .Fatalf ("failed to wait for cache to sync" )
187
+ }
165
188
166
- // Any "Wait" calls need to be made, after the informers have been started
167
- start := time .Now ()
168
- glog .Infof ("Waiting for cache sync in main" )
169
- kubeInformerFactory .WaitForCacheSync (stopCh )
170
- setup .profileInformerFactory .WaitForCacheSync (stopCh )
189
+ // go setup.profileInformerFactory.Start(stopCh)
171
190
172
- // Block and wait for the endpoints to become synchronised
173
- cache .WaitForCacheSync (stopCh , endpointsInformer .Informer ().HasSynced )
191
+ profileInformerFactory := setup .profileInformerFactory
192
+ profiles := profileInformerFactory .Openfaas ().V1 ().Profiles ()
193
+ go profiles .Informer ().Run (stopCh )
194
+ if ok := cache .WaitForNamedCacheSync ("faas-netes:profiles" , stopCh , profiles .Informer ().HasSynced ); ! ok {
195
+ log .Fatalf ("failed to wait for cache to sync" )
196
+ }
174
197
175
- glog .Infof ("Cache sync done in: %fs" , time .Since (start ).Seconds ())
176
- glog .Infof ("Endpoints synced? %v" , endpointsInformer .Informer ().HasSynced ())
198
+ return customInformers {
199
+ EndpointsInformer : endpoints ,
200
+ DeploymentInformer : deployments ,
201
+ FunctionsInformer : functions ,
202
+ }
203
+ }
177
204
178
- lister := endpointsInformer .Lister ()
179
- functionLookup := k8s .NewFunctionLookup (config .DefaultFunctionNamespace , lister )
205
+ // runController runs the faas-netes imperative controller
206
+ func runController (setup serverSetup ) {
207
+ config := setup .config
208
+ kubeClient := setup .kubeClient
209
+ factory := setup .functionFactory
210
+
211
+ // set up signals so we handle the first shutdown signal gracefully
212
+ stopCh := signals .SetupSignalHandler ()
213
+ operator := false
214
+ listers := startInformers (setup , stopCh , operator )
215
+
216
+ functionLookup := k8s .NewFunctionLookup (config .DefaultFunctionNamespace , listers .EndpointsInformer .Lister ())
180
217
181
218
bootstrapHandlers := providertypes.FaaSHandlers {
182
219
FunctionProxy : proxy .NewHandlerFunc (config .FaaSConfig , functionLookup ),
183
220
DeleteHandler : handlers .MakeDeleteHandler (config .DefaultFunctionNamespace , kubeClient ),
184
221
DeployHandler : handlers .MakeDeployHandler (config .DefaultFunctionNamespace , factory ),
185
- FunctionReader : handlers .MakeFunctionReader (config .DefaultFunctionNamespace , deploymentLister ),
186
- ReplicaReader : handlers .MakeReplicaReader (config .DefaultFunctionNamespace , deploymentLister ),
222
+ FunctionReader : handlers .MakeFunctionReader (config .DefaultFunctionNamespace , listers . DeploymentInformer . Lister () ),
223
+ ReplicaReader : handlers .MakeReplicaReader (config .DefaultFunctionNamespace , listers . DeploymentInformer . Lister () ),
187
224
ReplicaUpdater : handlers .MakeReplicaUpdater (config .DefaultFunctionNamespace , kubeClient ),
188
225
UpdateHandler : handlers .MakeUpdateHandler (config .DefaultFunctionNamespace , factory ),
189
226
HealthHandler : handlers .MakeHealthHandler (),
@@ -198,9 +235,6 @@ func runController(setup serverSetup) {
198
235
199
236
// runOperator runs the CRD Operator
200
237
func runOperator (setup serverSetup , cfg config.BootstrapConfig ) {
201
- // pull out the required config and clients fromthe setup, this is largely a
202
- // leftover from refactoring the setup to a shared step and keeping the function
203
- // signature readable
204
238
kubeClient := setup .kubeClient
205
239
faasClient := setup .faasClient
206
240
kubeInformerFactory := setup .kubeInformerFactory
@@ -214,8 +248,10 @@ func runOperator(setup serverSetup, cfg config.BootstrapConfig) {
214
248
setupLogging ()
215
249
// set up signals so we handle the first shutdown signal gracefully
216
250
stopCh := signals .SetupSignalHandler ()
217
- endpointsInformer := kubeInformerFactory .Core ().V1 ().Endpoints ()
218
- deploymentInformer := kubeInformerFactory .Apps ().V1 ().Deployments ()
251
+ // set up signals so we handle the first shutdown signal gracefully
252
+
253
+ operator := true
254
+ listers := startInformers (setup , stopCh , operator )
219
255
220
256
ctrl := controller .NewController (
221
257
kubeClient ,
@@ -225,23 +261,7 @@ func runOperator(setup serverSetup, cfg config.BootstrapConfig) {
225
261
factory ,
226
262
)
227
263
228
- srv := server .New (faasClient , kubeClient , endpointsInformer , deploymentInformer , cfg .ClusterRole , cfg )
229
-
230
- go faasInformerFactory .Start (stopCh )
231
- go kubeInformerFactory .Start (stopCh )
232
- go setup .profileInformerFactory .Start (stopCh )
233
-
234
- // Any "Wait" calls need to be made, after the informers have been started
235
- start := time .Now ()
236
- glog .Infof ("Waiting for cache sync in main" )
237
- kubeInformerFactory .WaitForCacheSync (stopCh )
238
- setup .profileInformerFactory .WaitForCacheSync (stopCh )
239
-
240
- // Block and wait for the endpoints to become synchronised
241
- cache .WaitForCacheSync (stopCh , endpointsInformer .Informer ().HasSynced )
242
-
243
- glog .Infof ("Cache sync done in: %fs" , time .Since (start ).Seconds ())
244
- glog .Infof ("Endpoints synced? %v" , endpointsInformer .Informer ().HasSynced ())
264
+ srv := server .New (faasClient , kubeClient , listers .EndpointsInformer , listers .DeploymentInformer .Lister (), cfg .ClusterRole , cfg )
245
265
246
266
go srv .Start ()
247
267
if err := ctrl .Run (1 , stopCh ); err != nil {
0 commit comments