Skip to content

Commit 7476f1d

Browse files
committed
PWX-34271: pick the pods on unschedulable nodes for update first (#1544)
If we have marked the node unschedulable in the previous Reconcile(), we have to pick that node first. Otherwise, we may end up making too many nodes unschedulable. Also, retry when add/remove of node label fails with conflict error so we don't wait for another 30 seconds. Signed-off-by: Neelesh Thakur <[email protected]>
1 parent b25624f commit 7476f1d

File tree

5 files changed

+80
-42
lines changed

5 files changed

+80
-42
lines changed

pkg/constants/metadata.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ const (
4545
// the custom registry, there is a list of hardcoded common registries, however the list
4646
// may not be complete, users can use this annotation to add more.
4747
AnnotationCommonImageRegistries = OperatorPrefix + "/common-image-registries"
48-
// StorkAnnotationUnschedulable tells Stork to consider the node unschedulable
49-
StorkAnnotationUnschedulable = "stork.libopenstorage.org/unschedulable"
48+
// AnnotationUnschedulable tells Stork to consider the node unschedulable
49+
AnnotationUnschedulable = OperatorPrefix + "/unschedulable"
5050
// OperatorLabelManagedByKey is a label key that is added to any object that is
5151
// managed the Portworx operator.
5252
OperatorLabelManagedByKey = OperatorPrefix + "/managed-by"

pkg/controller/storagecluster/controller_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2643,9 +2643,9 @@ func TestKubevirtVMsDuringUpgrade(t *testing.T) {
26432643
}
26442644

26452645
// add unschedulable label to the nodes and then verify that they get removed
2646-
err = storkUnschedulableAnnotationHelper(k8sClient, k8sNodes[0].Name, true)
2646+
err = nodeUnschedulableAnnotationHelper(k8sClient, k8sNodes[0].Name, true)
26472647
require.NoError(t, err)
2648-
err = storkUnschedulableAnnotationHelper(k8sClient, k8sNodes[2].Name, true)
2648+
err = nodeUnschedulableAnnotationHelper(k8sClient, k8sNodes[2].Name, true)
26492649
require.NoError(t, err)
26502650
verifyUnschedulableAnnotation(t, k8sClient, k8sNodes[0].Name, true)
26512651
verifyUnschedulableAnnotation(t, k8sClient, k8sNodes[1].Name, false)
@@ -2709,7 +2709,7 @@ func TestKubevirtVMsDuringUpgrade(t *testing.T) {
27092709

27102710
// reset the state before the next test
27112711
for _, k8sNode := range k8sNodes {
2712-
err = storkUnschedulableAnnotationHelper(k8sClient, k8sNode.Name, false)
2712+
err = nodeUnschedulableAnnotationHelper(k8sClient, k8sNode.Name, false)
27132713
require.NoError(t, err)
27142714
verifyUnschedulableAnnotation(t, k8sClient, k8sNode.Name, false)
27152715
}
@@ -7386,6 +7386,7 @@ func TestUpdateStorageClusterNodeSpec(t *testing.T) {
73867386
recorder: recorder,
73877387
kubernetesVersion: k8sVersion,
73887388
nodeInfoMap: maps.MakeSyncMap[string, *k8s.NodeInfo](),
7389+
kubevirt: testutil.NoopKubevirtManager(mockCtrl),
73897390
}
73907391

73917392
driver.EXPECT().Validate(gomock.Any()).Return(nil).AnyTimes()
@@ -10648,7 +10649,7 @@ func createPxApiPod(
1064810649

1064910650
func verifyUnschedulableAnnotation(t *testing.T, k8sClient client.Client, nodeName string, expectUnschedulable bool) {
1065010651
node := getNode(t, k8sClient, nodeName)
10651-
val, ok := node.Annotations[constants.StorkAnnotationUnschedulable]
10652+
val, ok := node.Annotations[constants.AnnotationUnschedulable]
1065210653
if expectUnschedulable {
1065310654
require.True(t, ok && val == "true")
1065410655
} else {

pkg/controller/storagecluster/storagecluster.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -836,18 +836,25 @@ func (c *Controller) syncStorageCluster(
836836
}
837837
hash := cur.Labels[util.DefaultStorageClusterUniqueLabelKey]
838838

839+
nodeList := &v1.NodeList{}
840+
err = c.client.List(context.TODO(), nodeList, &client.ListOptions{})
841+
if err != nil {
842+
return fmt.Errorf("couldn't get list of nodes when syncing storage cluster %#v: %v",
843+
cluster, err)
844+
}
845+
839846
// TODO: Don't process a storage cluster until all its previous creations and
840847
// deletions have been processed.
841-
err = c.manage(cluster, hash)
848+
err = c.manage(cluster, hash, nodeList)
842849
if err != nil {
843850
return err
844851
}
845852

846853
switch cluster.Spec.UpdateStrategy.Type {
847854
case corev1.OnDeleteStorageClusterStrategyType:
848855
case corev1.RollingUpdateStorageClusterStrategyType:
849-
if err := c.rollingUpdate(cluster, hash); err != nil {
850-
return err
856+
if err := c.rollingUpdate(cluster, hash, nodeList); err != nil {
857+
return fmt.Errorf("rolling update failed: %s", err)
851858
}
852859
}
853860

@@ -1020,6 +1027,7 @@ func (c *Controller) updateStorageClusterStatus(
10201027
func (c *Controller) manage(
10211028
cluster *corev1.StorageCluster,
10221029
hash string,
1030+
nodeList *v1.NodeList,
10231031
) error {
10241032
// Run the pre install hook for the driver to ensure we are ready to create storage pods
10251033
if err := c.Driver.PreInstall(cluster); err != nil {
@@ -1034,13 +1042,6 @@ func (c *Controller) manage(
10341042
}
10351043
var nodesNeedingStoragePods, podsToDelete []string
10361044

1037-
nodeList := &v1.NodeList{}
1038-
err = c.client.List(context.TODO(), nodeList, &client.ListOptions{})
1039-
if err != nil {
1040-
return fmt.Errorf("couldn't get list of nodes when syncing storage cluster %#v: %v",
1041-
cluster, err)
1042-
}
1043-
10441045
// For each node, if the node is running the storage pod but isn't supposed to, kill the storage pod.
10451046
// If the node is supposed to run the storage pod, but isn't, create the storage pod on the node.
10461047
for _, node := range nodeList.Items {

pkg/controller/storagecluster/update.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@ import (
4949
"github.com/libopenstorage/operator/pkg/util/k8s"
5050
)
5151

52+
const (
53+
k8sNodeUpdateAttempts = 5
54+
)
55+
5256
// rollingUpdate deletes old storage cluster pods making sure that no more than
5357
// cluster.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
54-
func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string) error {
58+
func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string, nodeList *v1.NodeList) error {
5559
logrus.Debug("Perform rolling update")
5660
nodeToStoragePods, err := c.getNodeToStoragePods(cluster)
5761
if err != nil {
@@ -74,7 +78,7 @@ func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string)
7478
continue
7579
}
7680
if _, ok := availablePods[pod.Name]; ok {
77-
if err := c.removeStorkUnschedulableLabel(pod.Spec.NodeName); err != nil {
81+
if err := c.removeNodeUnschedulableLabel(pod.Spec.NodeName); err != nil {
7882
return err
7983
}
8084
}
@@ -94,7 +98,7 @@ func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string)
9498
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
9599
}
96100

97-
// TODO: Max unavaibable kvdb nodes has been set to 1 assuming default setup of 3 KVDB nodes.
101+
// TODO: Max unavailable kvdb nodes has been set to 1 assuming default setup of 3 KVDB nodes.
98102
// Need to generalise code for 5 KVDB nodes
99103

100104
// get the number of kvdb members which are unavailable in case of internal kvdb
@@ -107,7 +111,21 @@ func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string)
107111
}
108112
}
109113

114+
nodesMarkedUnschedulable := map[string]bool{}
115+
for _, node := range nodeList.Items {
116+
if nodeMarkedUnschedulable(&node) {
117+
nodesMarkedUnschedulable[node.Name] = true
118+
}
119+
}
110120
logrus.Debugf("Marking old pods for deletion")
121+
// sort the pods such that the pods on the nodes that we marked unschedulable are deleted first
122+
// Otherwise, we will end up marking too many nodes as unschedulable (or ping-ponging VMs between the nodes).
123+
sort.Slice(oldAvailablePods, func(i, j int) bool {
124+
iUnschedulable := nodesMarkedUnschedulable[oldAvailablePods[i].Spec.NodeName]
125+
jUnschedulable := nodesMarkedUnschedulable[oldAvailablePods[j].Spec.NodeName]
126+
127+
return iUnschedulable && !jUnschedulable
128+
})
111129
for _, pod := range oldAvailablePods {
112130
if numUnavailable >= maxUnavailable {
113131
logrus.Infof("Number of unavailable StorageCluster pods: %d, is equal "+
@@ -166,7 +184,7 @@ func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string)
166184
logrus.Warnf("Pod %s has no node name; cannot add unschedulable label to the node", podName)
167185
continue
168186
}
169-
if err := c.addStorkUnschedulableAnnotation(pod.Spec.NodeName); err != nil {
187+
if err := c.addNodeUnschedulableAnnotation(pod.Spec.NodeName); err != nil {
170188
return err
171189
}
172190
}
@@ -220,15 +238,30 @@ func (c *Controller) rollingUpdate(cluster *corev1.StorageCluster, hash string)
220238
return c.syncNodes(cluster, oldPodsToDeletePruned, []string{}, hash)
221239
}
222240

223-
func (c *Controller) addStorkUnschedulableAnnotation(nodeName string) error {
224-
return storkUnschedulableAnnotationHelper(c.client, nodeName, true)
241+
func (c *Controller) addNodeUnschedulableAnnotation(nodeName string) error {
242+
return nodeUnschedulableAnnotationHelperWithRetries(c.client, nodeName, true)
225243
}
226244

227-
func (c *Controller) removeStorkUnschedulableLabel(nodeName string) error {
228-
return storkUnschedulableAnnotationHelper(c.client, nodeName, false)
245+
func (c *Controller) removeNodeUnschedulableLabel(nodeName string) error {
246+
return nodeUnschedulableAnnotationHelperWithRetries(c.client, nodeName, false)
229247
}
230248

231-
func storkUnschedulableAnnotationHelper(k8sClient client.Client, nodeName string, unschedulable bool) error {
249+
func nodeUnschedulableAnnotationHelperWithRetries(
250+
k8sClient client.Client, nodeName string, unschedulable bool,
251+
) error {
252+
var err error
253+
for i := 0; i < k8sNodeUpdateAttempts; i++ {
254+
err = nodeUnschedulableAnnotationHelper(k8sClient, nodeName, unschedulable)
255+
if err == nil || !errors.IsConflict(err) {
256+
return err
257+
}
258+
logrus.Warnf("Conflict while updating annotation %s on node %s in attempt %d: %v",
259+
constants.AnnotationUnschedulable, nodeName, i, err)
260+
}
261+
return err
262+
}
263+
264+
func nodeUnschedulableAnnotationHelper(k8sClient client.Client, nodeName string, unschedulable bool) error {
232265
ctx := context.TODO()
233266
node := &v1.Node{}
234267
err := k8sClient.Get(ctx, types.NamespacedName{Name: nodeName}, node)
@@ -240,28 +273,25 @@ func storkUnschedulableAnnotationHelper(k8sClient client.Client, nodeName string
240273
return fmt.Errorf("failed to get node %s: %w", nodeName, err)
241274
}
242275
needsUpdate := false
243-
val, ok := node.Annotations[constants.StorkAnnotationUnschedulable]
244-
if unschedulable && val != "true" {
276+
val := nodeMarkedUnschedulable(node)
277+
if unschedulable && !val {
278+
logrus.Infof("Adding annotation %s to node %s", constants.AnnotationUnschedulable, nodeName)
245279
if node.Annotations == nil {
246280
node.Annotations = make(map[string]string)
247281
}
248-
node.Annotations[constants.StorkAnnotationUnschedulable] = "true"
282+
node.Annotations[constants.AnnotationUnschedulable] = "true"
249283
needsUpdate = true
250-
} else if !unschedulable && ok {
251-
delete(node.Annotations, constants.StorkAnnotationUnschedulable)
284+
} else if !unschedulable && val {
285+
logrus.Infof("Removing annotation %s from node %s", constants.AnnotationUnschedulable, nodeName)
286+
delete(node.Annotations, constants.AnnotationUnschedulable)
252287
needsUpdate = true
253288
}
254289
if !needsUpdate {
255290
return nil
256291
}
257-
if unschedulable {
258-
logrus.Infof("Adding annotation %s to node %s", constants.StorkAnnotationUnschedulable, nodeName)
259-
} else {
260-
logrus.Infof("Removing annotation %s from node %s", constants.StorkAnnotationUnschedulable, nodeName)
261-
}
262292
if err := k8sClient.Update(ctx, node); err != nil {
263293
return fmt.Errorf("failed to update annotation %s on node %s: %w",
264-
constants.StorkAnnotationUnschedulable, nodeName, err)
294+
constants.AnnotationUnschedulable, nodeName, err)
265295
}
266296
return nil
267297
}

pkg/controller/storagecluster/util.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,22 @@ func forceContinueUpgrade(
140140
}
141141

142142
func evictVMsDuringUpdate(cluster *corev1.StorageCluster) bool {
143-
value, exists := cluster.Annotations[constants.AnnotationEvictVMsDuringUpdate]
143+
return getBoolVal(cluster.Annotations, constants.AnnotationEvictVMsDuringUpdate, true)
144+
}
145+
146+
func nodeMarkedUnschedulable(node *v1.Node) bool {
147+
return getBoolVal(node.Annotations, constants.AnnotationUnschedulable, false)
148+
}
149+
150+
func getBoolVal(m map[string]string, key string, defaultVal bool) bool {
151+
value, exists := m[key]
144152
if !exists {
145-
// default to true
146-
return true
153+
return defaultVal
147154
}
148155
boolval, err := strconv.ParseBool(value)
149156
if err != nil {
150-
logrus.Warnf("Invalid value %s for annotation %s; defaulting to true: %v",
151-
value, constants.AnnotationEvictVMsDuringUpdate, err)
152-
return true
157+
logrus.Warnf("Invalid value %s for annotation %s; defaulting to %v: %v", value, key, defaultVal, err)
158+
return defaultVal
153159
}
154160
return boolval
155161
}

0 commit comments

Comments
 (0)