Skip to content

Commit ee39d35

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#45356 from wojtek-t/edge_based_userspace_proxy
Automatic merge from submit-queue Edge based userspace proxy Second last PR from my changes to kube-proxy to make it event-based. This is switching userspace proxy to be even-based similarly to what we already did with iptables.
2 parents b9f340d + 33a7a28 commit ee39d35

File tree

3 files changed

+180
-142
lines changed

3 files changed

+180
-142
lines changed

cmd/kube-proxy/app/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
470470
if err != nil {
471471
return nil, fmt.Errorf("unable to create proxier: %v", err)
472472
}
473-
serviceHandler = proxierUserspace
473+
serviceEventHandler = proxierUserspace
474474
proxier = proxierUserspace
475475
}
476476
// Remove artifacts from the pure-iptables Proxier, if not on Windows.

pkg/proxy/userspace/proxier.go

Lines changed: 101 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ type ServiceInfo struct {
5252
Timeout time.Duration
5353
// ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
5454
ActiveClients *ClientCache
55-
// ServiceRef is a full object reference to the the service described by this ServiceInfo
56-
ServiceRef api.ObjectReference
5755

5856
isAliveAtomic int32 // Only access this with atomic ops
5957
portal portal
@@ -358,7 +356,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv
358356
// addServiceOnPort starts listening for a new service, returning the ServiceInfo.
359357
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
360358
// connections, for now.
361-
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
359+
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
362360
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
363361
if err != nil {
364362
return nil, err
@@ -376,7 +374,6 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR
376374
si := &ServiceInfo{
377375
Timeout: timeout,
378376
ActiveClients: newClientCache(),
379-
ServiceRef: serviceRef,
380377

381378
isAliveAtomic: 1,
382379
proxyPort: portNum,
@@ -398,108 +395,124 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR
398395
return si, nil
399396
}
400397

401-
// OnServiceUpdate manages the active set of service proxies.
402-
// Active service proxies are reinitialized if found in the update set or
403-
// shutdown if missing from the update set.
404-
func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
405-
glog.V(4).Infof("Received update notice: %+v", services)
406-
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
407-
for _, service := range services {
408-
// if ClusterIP is "None" or empty, skip proxying
409-
if !helper.IsServiceIPSet(service) {
410-
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
398+
func (proxier *Proxier) mergeService(service *api.Service) sets.String {
399+
if service == nil {
400+
return nil
401+
}
402+
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
403+
if !helper.IsServiceIPSet(service) {
404+
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
405+
return nil
406+
}
407+
existingPorts := sets.NewString()
408+
for i := range service.Spec.Ports {
409+
servicePort := &service.Spec.Ports[i]
410+
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
411+
existingPorts.Insert(servicePort.Name)
412+
info, exists := proxier.getServiceInfo(serviceName)
413+
// TODO: check health of the socket? What if ProxyLoop exited?
414+
if exists && sameConfig(info, service, servicePort) {
415+
// Nothing changed.
411416
continue
412417
}
413-
414-
// TODO: should this just be ref.GetReference?
415-
svcGVK := service.GetObjectKind().GroupVersionKind()
416-
svcRef := api.ObjectReference{
417-
Kind: svcGVK.Kind,
418-
Namespace: service.Namespace,
419-
Name: service.Name,
420-
UID: service.UID,
421-
APIVersion: svcGVK.GroupVersion().String(),
422-
ResourceVersion: service.ResourceVersion,
423-
}
424-
425-
for i := range service.Spec.Ports {
426-
servicePort := &service.Spec.Ports[i]
427-
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
428-
activeServices[serviceName] = true
429-
serviceIP := net.ParseIP(service.Spec.ClusterIP)
430-
info, exists := proxier.getServiceInfo(serviceName)
431-
// TODO: check health of the socket? What if ProxyLoop exited?
432-
if exists && sameConfig(info, service, servicePort) {
433-
// Nothing changed.
434-
continue
418+
if exists {
419+
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
420+
if err := proxier.closePortal(serviceName, info); err != nil {
421+
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
435422
}
436-
if exists {
437-
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
438-
err := proxier.closePortal(serviceName, info)
439-
if err != nil {
440-
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
441-
}
442-
err = proxier.stopProxy(serviceName, info)
443-
if err != nil {
444-
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
445-
}
446-
}
447-
448-
proxyPort, err := proxier.proxyPorts.AllocateNext()
449-
if err != nil {
450-
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
451-
continue
423+
if err := proxier.stopProxy(serviceName, info); err != nil {
424+
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
452425
}
426+
}
427+
proxyPort, err := proxier.proxyPorts.AllocateNext()
428+
if err != nil {
429+
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
430+
continue
431+
}
453432

454-
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
455-
info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
456-
if err != nil {
457-
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
458-
continue
459-
}
460-
info.portal.ip = serviceIP
461-
info.portal.port = int(servicePort.Port)
462-
info.externalIPs = service.Spec.ExternalIPs
463-
// Deep-copy in case the service instance changes
464-
info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
465-
info.nodePort = int(servicePort.NodePort)
466-
info.sessionAffinityType = service.Spec.SessionAffinity
467-
glog.V(4).Infof("info: %#v", info)
468-
469-
err = proxier.openPortal(serviceName, info)
470-
if err != nil {
471-
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
472-
}
473-
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
433+
serviceIP := net.ParseIP(service.Spec.ClusterIP)
434+
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
435+
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
436+
if err != nil {
437+
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
438+
continue
474439
}
440+
info.portal.ip = serviceIP
441+
info.portal.port = int(servicePort.Port)
442+
info.externalIPs = service.Spec.ExternalIPs
443+
// Deep-copy in case the service instance changes
444+
info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
445+
info.nodePort = int(servicePort.NodePort)
446+
info.sessionAffinityType = service.Spec.SessionAffinity
447+
glog.V(4).Infof("info: %#v", info)
448+
449+
if err := proxier.openPortal(serviceName, info); err != nil {
450+
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
451+
}
452+
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
453+
}
454+
455+
return existingPorts
456+
}
457+
458+
func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.String) {
459+
if service == nil {
460+
return
461+
}
462+
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
463+
if !helper.IsServiceIPSet(service) {
464+
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
465+
return
475466
}
476467

477468
staleUDPServices := sets.NewString()
478469
proxier.mu.Lock()
479470
defer proxier.mu.Unlock()
480-
for name, info := range proxier.serviceMap {
481-
if !activeServices[name] {
482-
glog.V(1).Infof("Stopping service %q", name)
471+
for i := range service.Spec.Ports {
472+
servicePort := &service.Spec.Ports[i]
473+
if existingPorts.Has(servicePort.Name) {
474+
continue
475+
}
476+
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
483477

484-
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
485-
staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String())
486-
}
478+
glog.V(1).Infof("Stopping service %q", serviceName)
479+
info, exists := proxier.serviceMap[serviceName]
480+
if !exists {
481+
glog.Errorf("Service %q is being removed but doesn't exist", serviceName)
482+
continue
483+
}
487484

488-
err := proxier.closePortal(name, info)
489-
if err != nil {
490-
glog.Errorf("Failed to close portal for %q: %v", name, err)
491-
}
492-
err = proxier.stopProxyInternal(name, info)
493-
if err != nil {
494-
glog.Errorf("Failed to stop service %q: %v", name, err)
495-
}
496-
proxier.loadBalancer.DeleteService(name)
485+
if proxier.serviceMap[serviceName].protocol == api.ProtocolUDP {
486+
staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String())
497487
}
498-
}
499488

489+
if err := proxier.closePortal(serviceName, info); err != nil {
490+
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
491+
}
492+
if err := proxier.stopProxyInternal(serviceName, info); err != nil {
493+
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
494+
}
495+
proxier.loadBalancer.DeleteService(serviceName)
496+
}
500497
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
501498
}
502499

500+
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
501+
_ = proxier.mergeService(service)
502+
}
503+
504+
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
505+
existingPorts := proxier.mergeService(service)
506+
proxier.unmergeService(oldService, existingPorts)
507+
}
508+
509+
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
510+
proxier.unmergeService(service, sets.NewString())
511+
}
512+
513+
func (proxier *Proxier) OnServiceSynced() {
514+
}
515+
503516
func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool {
504517
if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
505518
return false

0 commit comments

Comments
 (0)