@@ -42,9 +42,14 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
4242
4343// TODO: make a separate function to be called from InitSharedInformers
4444// clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary
45+ // NB: as this function is called directly by the informer, it needs to avoid acquiring locks
46+ // on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes
47+ // and not on the internal state of the clusters.
4548func (c * Controller ) clusterListFunc (options metav1.ListOptions ) (runtime.Object , error ) {
46- var list spec.PostgresqlList
47- var activeClustersCnt , failedClustersCnt int
49+ var (
50+ list spec.PostgresqlList
51+ event spec.EventType
52+ )
4853
4954 req := c .KubeClient .CRDREST .
5055 Get ().
@@ -61,19 +66,41 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
6166 c .logger .Warningf ("could not unmarshal list of clusters: %v" , err )
6267 }
6368
64- timeFromPreviousSync := time .Now ().Unix () - atomic .LoadInt64 (& c .lastClusterSyncTime )
65- if timeFromPreviousSync < int64 (c .opConfig .ResyncPeriod .Seconds ()) {
66- c .logger .Infof ("not running SYNC, previous sync happened %d seconds ago" , timeFromPreviousSync )
67- return & list , err
69+ currentTime := time .Now ().Unix ()
70+ timeFromPreviousSync := currentTime - atomic .LoadInt64 (& c .lastClusterSyncTime )
71+ timeFromPreviousRepair := currentTime - atomic .LoadInt64 (& c .lastClusterRepairTime )
72+ if timeFromPreviousSync >= int64 (c .opConfig .ResyncPeriod .Seconds ()) {
73+ event = spec .EventSync
74+ } else if timeFromPreviousRepair >= int64 (c .opConfig .RepairPeriod .Seconds ()) {
75+ event = spec .EventRepair
6876 }
77+ if event != "" {
78+ c .queueEvents (& list , event )
79+ } else {
80+ c .logger .Infof ("not enough time passed since the last sync (%s seconds) or repair (%s seconds)" ,
81+ timeFromPreviousSync , timeFromPreviousRepair )
82+ }
83+ return & list , err
84+ }
6985
86+ // queueEvents queues a sync or repair event for every cluster with a valid manifest
87+ func (c * Controller ) queueEvents (list * spec.PostgresqlList , event spec.EventType ) {
88+ var activeClustersCnt , failedClustersCnt , clustersToRepair int
7089 for i , pg := range list .Items {
7190 if pg .Error != nil {
7291 failedClustersCnt ++
7392 continue
7493 }
75- c .queueClusterEvent (nil , & list .Items [i ], spec .EventSync )
7694 activeClustersCnt ++
95+ // check if that cluster needs repair
96+ if event == spec .EventRepair {
97+ if pg .Status .Success () {
98+ continue
99+ } else {
100+ clustersToRepair ++
101+ }
102+ }
103+ c .queueClusterEvent (nil , & list .Items [i ], event )
77104 }
78105 if len (list .Items ) > 0 {
79106 if failedClustersCnt > 0 && activeClustersCnt == 0 {
@@ -83,13 +110,18 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
83110 } else {
84111 c .logger .Infof ("there are %d clusters running and %d are in the failed state" , activeClustersCnt , failedClustersCnt )
85112 }
113+ if clustersToRepair > 0 {
114+ c .logger .Infof ("%d clusters are scheduled for a repair scan" , clustersToRepair )
115+ }
86116 } else {
87117 c .logger .Infof ("no clusters running" )
88118 }
89-
90- atomic .StoreInt64 (& c .lastClusterSyncTime , time .Now ().Unix ())
91-
92- return & list , err
119+ if event == spec .EventRepair || event == spec .EventSync {
120+ atomic .StoreInt64 (& c .lastClusterRepairTime , time .Now ().Unix ())
121+ if event == spec .EventSync {
122+ atomic .StoreInt64 (& c .lastClusterSyncTime , time .Now ().Unix ())
123+ }
124+ }
93125}
94126
95127type crdDecoder struct {
@@ -155,7 +187,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
155187
156188 lg := c .logger .WithField ("worker" , event .WorkerID )
157189
158- if event .EventType == spec .EventAdd || event .EventType == spec .EventSync {
190+ if event .EventType == spec .EventAdd || event .EventType == spec .EventSync || event . EventType == spec . EventRepair {
159191 clusterName = util .NameFromMeta (event .NewSpec .ObjectMeta )
160192 } else {
161193 clusterName = util .NameFromMeta (event .OldSpec .ObjectMeta )
@@ -171,6 +203,16 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
171203
172204 defer c .curWorkerCluster .Store (event .WorkerID , nil )
173205
206+ if event .EventType == spec .EventRepair {
207+ runRepair , lastOperationStatus := cl .NeedsRepair ()
208+ if ! runRepair {
209+ lg .Debugf ("Observed cluster status %s, repair is not required" , lastOperationStatus )
210+ return
211+ }
212+ lg .Debugf ("Observed cluster status %s, running sync scan to repair the cluster" , lastOperationStatus )
213+ event .EventType = spec .EventSync
214+ }
215+
174216 if event .EventType == spec .EventAdd || event .EventType == spec .EventUpdate || event .EventType == spec .EventSync {
175217 // handle deprecated parameters by possibly assigning their values to the new ones.
176218 if event .OldSpec != nil {
@@ -406,8 +448,8 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *spec.Po
406448 if eventType != spec .EventDelete {
407449 return
408450 }
409-
410- for _ , evType := range []spec.EventType {spec .EventAdd , spec .EventSync , spec .EventUpdate } {
451+ // A delete event discards all prior requests for that cluster.
452+ for _ , evType := range []spec.EventType {spec .EventAdd , spec .EventSync , spec .EventUpdate , spec . EventRepair } {
411453 obj , exists , err := c .clusterEventQueues [workerID ].GetByKey (queueClusterKey (evType , uid ))
412454 if err != nil {
413455 lg .Warningf ("could not get event from the queue: %v" , err )
0 commit comments