Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/administrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,9 @@ Load balancer services can also be enabled for the [connection pooler](user.md#c
pods with manifest flags `enableMasterPoolerLoadBalancer` and/or
`enableReplicaPoolerLoadBalancer` or in the operator configuration with
`enable_master_pooler_load_balancer` and/or `enable_replica_pooler_load_balancer`.
For the `external-dns.alpha.kubernetes.io/hostname` annotation the `-pooler`
suffix will be appended to the cluster name used in the template which is
defined in `master|replica_dns_name_format`.

## Running periodic 'autorepair' scans of K8s objects

Expand Down
14 changes: 14 additions & 0 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,20 @@ def test_enable_disable_connection_pooler(self):
'LoadBalancer',
"Expected LoadBalancer service type for replica pooler pod, found {}")

master_annotations = {
"external-dns.alpha.kubernetes.io/hostname": "acid-minimal-cluster-pooler.default.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
}
self.eventuallyTrue(lambda: k8s.check_service_annotations(
master_pooler_label+","+pooler_label, master_annotations), "Wrong annotations")

replica_annotations = {
"external-dns.alpha.kubernetes.io/hostname": "acid-minimal-cluster-pooler-repl.default.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
}
self.eventuallyTrue(lambda: k8s.check_service_annotations(
replica_pooler_label+","+pooler_label, replica_annotations), "Wrong annotations")

# Turn off only master connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
Expand Down
2 changes: 2 additions & 0 deletions manifests/complete-postgres-manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
enableReplicaLoadBalancer: false
enableConnectionPooler: false # enable/disable connection pooler deployment
enableReplicaConnectionPooler: false # set to enable connectionPooler for replica service
enableMasterPoolerLoadBalancer: false
enableReplicaPoolerLoadBalancer: false
allowedSourceRanges: # load balancers' source ranges for both master and replica services
- 127.0.0.1/32
databases:
Expand Down
61 changes: 45 additions & 16 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ConnectionPoolerObjects struct {
}

func (c *Cluster) connectionPoolerName(role PostgresRole) string {
name := c.Name + "-pooler"
name := fmt.Sprintf("%s-%s", c.Name, constants.ConnectionPoolerResourceSuffix)
if role == Replica {
name = fmt.Sprintf("%s-%s", name, "repl")
}
Expand Down Expand Up @@ -163,24 +163,27 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
return reason, nil
}

//
// Generate pool size related environment variables.
//
// MAX_DB_CONN would specify the global maximum for connections to a target
// database.
//
// database.
//
// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough.
//
// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when
// most of the queries coming through a connection pooler are from the same
// user to the same db). In case if we want to spin up more connection pooler
// instances, take this into account and maintain the same number of
// connections.
//
// most of the queries coming through a connection pooler are from the same
// user to the same db). In case if we want to spin up more connection pooler
// instances, take this into account and maintain the same number of
// connections.
//
// MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload
// have to wait for spinning up a new connections.
//
// have to wait for spinning up a new connections.
//
// RESERVE_SIZE is how many additional connections to allow for a pooler.

func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
spec := &c.Spec
connectionPoolerSpec := spec.ConnectionPooler
Expand Down Expand Up @@ -475,23 +478,23 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
}

func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service {

spec := &c.Spec
poolerRole := connectionPooler.Role
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: connectionPooler.Name,
Port: pgPort,
TargetPort: intstr.IntOrString{IntVal: c.servicePort(connectionPooler.Role)},
TargetPort: intstr.IntOrString{IntVal: c.servicePort(poolerRole)},
},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler": c.connectionPoolerName(connectionPooler.Role),
"connection-pooler": c.connectionPoolerName(poolerRole),
},
}

if c.shouldCreateLoadBalancerForPoolerService(connectionPooler.Role, spec) {
if c.shouldCreateLoadBalancerForPoolerService(poolerRole, spec) {
c.configureLoadBalanceService(&serviceSpec, spec.AllowedSourceRanges)
}

Expand All @@ -500,7 +503,7 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo
Name: connectionPooler.Name,
Namespace: connectionPooler.Namespace,
Labels: c.connectionPoolerLabels(connectionPooler.Role, false).MatchLabels,
Annotations: c.annotationsSet(c.generateServiceAnnotations(connectionPooler.Role, spec)),
Annotations: c.annotationsSet(c.generatePoolerServiceAnnotations(poolerRole, spec)),
// make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned"
// propagation policy, which means that it's deletion will not
Expand All @@ -515,6 +518,32 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo
return service
}

func (c *Cluster) generatePoolerServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string {
var dnsString string
annotations := c.getCustomServiceAnnotations(role, spec)

if c.shouldCreateLoadBalancerForPoolerService(role, spec) {
// set ELB Timeout annotation with default value
if _, ok := annotations[constants.ElbTimeoutAnnotationName]; !ok {
annotations[constants.ElbTimeoutAnnotationName] = constants.ElbTimeoutAnnotationValue
}
// -repl suffix will be added by replicaDNSName
clusterNameWithPoolerSuffix := c.connectionPoolerName(Master)
if role == Master {
dnsString = c.masterDNSName(clusterNameWithPoolerSuffix)
} else {
dnsString = c.replicaDNSName(clusterNameWithPoolerSuffix)
}
annotations[constants.ZalandoDNSNameAnnotation] = dnsString
}

if len(annotations) == 0 {
return nil
}

return annotations
}

func (c *Cluster) shouldCreateLoadBalancerForPoolerService(role PostgresRole, spec *acidv1.PostgresSpec) bool {

switch role {
Expand Down Expand Up @@ -546,7 +575,7 @@ func (c *Cluster) listPoolerPods(listOptions metav1.ListOptions) ([]v1.Pod, erro
return pods.Items, nil
}

//delete connection pooler
// delete connection pooler
func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
c.logger.Infof("deleting connection pooler spilo-role=%s", role)

Expand Down Expand Up @@ -605,7 +634,7 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
return nil
}

//delete connection pooler
// delete connection pooler
func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
// Repeat the same for the secret object
secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User)
Expand Down Expand Up @@ -654,7 +683,7 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe
return deployment, nil
}

//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment
// updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment
func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) {
patchData, err := metaAnnotationsPatch(annotations)
if err != nil {
Expand Down
34 changes: 20 additions & 14 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,25 +1901,13 @@ func (c *Cluster) configureLoadBalanceService(serviceSpec *v1.ServiceSpec, sourc
}

func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
maps.Copy(annotations, c.OpConfig.CustomServiceAnnotations)

if spec != nil {
maps.Copy(annotations, spec.ServiceAnnotations)

switch role {
case Master:
maps.Copy(annotations, spec.MasterServiceAnnotations)
case Replica:
maps.Copy(annotations, spec.ReplicaServiceAnnotations)
}
}
annotations := c.getCustomServiceAnnotations(role, spec)

if c.shouldCreateLoadBalancerForService(role, spec) {
dnsName := c.dnsName(role)

// Just set ELB Timeout annotation with default value, if it does not
// have a cutom value
// have a custom value
if _, ok := annotations[constants.ElbTimeoutAnnotationName]; !ok {
annotations[constants.ElbTimeoutAnnotationName] = constants.ElbTimeoutAnnotationValue
}
Expand All @@ -1934,6 +1922,24 @@ func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.Pos
return annotations
}

func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
maps.Copy(annotations, c.OpConfig.CustomServiceAnnotations)

if spec != nil {
maps.Copy(annotations, spec.ServiceAnnotations)

switch role {
case Master:
maps.Copy(annotations, spec.MasterServiceAnnotations)
case Replica:
maps.Copy(annotations, spec.ReplicaServiceAnnotations)
}
}

return annotations
}

func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down
12 changes: 6 additions & 6 deletions pkg/cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ func (c *Cluster) dnsName(role PostgresRole) string {
var dnsString, oldDnsString string

if role == Master {
dnsString = c.masterDNSName()
dnsString = c.masterDNSName(c.Name)
} else {
dnsString = c.replicaDNSName()
dnsString = c.replicaDNSName(c.Name)
}

// if cluster name starts with teamID we might need to provide backwards compatibility
Expand All @@ -528,17 +528,17 @@ func (c *Cluster) dnsName(role PostgresRole) string {
return dnsString
}

func (c *Cluster) masterDNSName() string {
func (c *Cluster) masterDNSName(clusterName string) string {
return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format(
"cluster", c.Name,
"cluster", clusterName,
"namespace", c.Namespace,
"team", c.teamName(),
"hostedzone", c.OpConfig.DbHostedZone))
}

func (c *Cluster) replicaDNSName() string {
func (c *Cluster) replicaDNSName(clusterName string) string {
return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format(
"cluster", c.Name,
"cluster", clusterName,
"namespace", c.Namespace,
"team", c.teamName(),
"hostedzone", c.OpConfig.DbHostedZone))
Expand Down
1 change: 1 addition & 0 deletions pkg/util/constants/pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package constants

// Connection pooler specific constants
const (
ConnectionPoolerResourceSuffix = "pooler"
ConnectionPoolerUserName = "pooler"
ConnectionPoolerSchemaName = "pooler"
ConnectionPoolerDefaultType = "pgbouncer"
Expand Down