Skip to content

Commit 6244d68

Browse files
Merge 958227e into backport/mukul/fix-policy-replication/blatantly-amazed-salmon
2 parents ce0a692 + 958227e commit 6244d68

File tree

3 files changed

+73
-3
lines changed

3 files changed

+73
-3
lines changed

.changelog/22954.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
acl: fixed a bug where ACL policy replication in WANfed is impacted when primaryDC is inconsistent
3+
```

agent/consul/acl_replication.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ type aclTypeReplicator interface {
6666
// correction phase).
6767
FetchUpdated(srv *Server, updates []string) (int, error)
6868

69+
//ensureRemoteConsistent compares the updated items with the upsertable remotes
70+
//returns consistent update check post stale batch read or other scenarios
71+
//[]string of items missing from remote, []string of items updated remote, error if inconsistent
72+
ensureRemoteConsistent(updates []string) ([]string, []string, error)
73+
6974
// LenPendingUpdates should be the size of the data retrieved in
7075
// FetchUpdated.
7176
LenPendingUpdates() int
@@ -87,6 +92,7 @@ type aclTypeReplicator interface {
8792
}
8893

8994
var errContainsRedactedData = errors.New("replication results contain redacted data")
95+
var errContainsStaleData = errors.New("replication batch read for update items contain stale data")
9096

9197
func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) {
9298
req := structs.ACLRoleBatchGetRequest{
@@ -436,11 +442,22 @@ func (s *Server) replicateACLType(ctx context.Context, logger hclog.Logger, tr a
436442

437443
if len(res.LocalUpserts) > 0 {
438444
lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts)
439-
if err == errContainsRedactedData {
440-
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
441-
} else if err != nil {
445+
if err != nil {
446+
if err == errContainsRedactedData {
447+
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
448+
}
442449
return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err)
443450
}
451+
//if fetch updated gets stale inconsistent data then we should not proceed with applying
452+
//the updates as that would lead to partial/stale data being replicated
453+
//hence, we call ensureRemoteConsistent to validate the fetched updates with diff results
454+
_, _, err = tr.ensureRemoteConsistent(res.LocalUpserts)
455+
if err != nil {
456+
if err == errContainsStaleData {
457+
return 0, false, fmt.Errorf("failed to ensure consistent %s replication updates - stale data", tr.PluralNoun())
458+
}
459+
return 0, false, fmt.Errorf("failed to ensure consistent ACL %s updates: %v", tr.SingularNoun(), err)
460+
}
444461
logger.Debug(
445462
"acl replication - downloaded updates",
446463
"amount", lenUpdated,

agent/consul/acl_replication_types.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package consul
55

66
import (
7+
"bytes"
78
"context"
89
"fmt"
910

@@ -86,6 +87,11 @@ func (r *aclTokenReplicator) FetchUpdated(srv *Server, updates []string) (int, e
8687
return len(r.updated), nil
8788
}
8889

90+
func (r *aclTokenReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
91+
//return true if consistent updates,
92+
return []string{}, []string{}, nil
93+
}
94+
8995
func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
9096
req := structs.ACLTokenBatchDeleteRequest{
9197
TokenIDs: batch,
@@ -186,6 +192,45 @@ func (r *aclPolicyReplicator) FetchUpdated(srv *Server, updates []string) (int,
186192
return len(r.updated), nil
187193
}
188194

195+
func (r *aclPolicyReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
196+
updatedMap := make(map[string]*structs.ACLPolicy)
197+
for _, policy := range r.updated {
198+
updatedMap[policy.ID] = policy
199+
}
200+
201+
remoteMap := make(map[string]*structs.ACLPolicyListStub)
202+
for _, policyStub := range r.remote {
203+
remoteMap[policyStub.ID] = policyStub
204+
}
205+
206+
//iterate over all updates array which are policy IDs check if the hash match for both
207+
var consistent = true
208+
var remoteNotCreated []string
209+
var remoteNotUpdated []string
210+
var err error = nil
211+
212+
for _, policyID := range updates {
213+
if updatedPolicy, ok := updatedMap[policyID]; ok {
214+
if remotePolicy, ok := remoteMap[policyID]; ok {
215+
if !bytes.Equal(updatedPolicy.Hash, remotePolicy.Hash) && updatedPolicy.ModifyIndex < remotePolicy.ModifyIndex {
216+
// remote stale batch did not get modified policy than what local diff calculated
217+
remoteNotUpdated = append(remoteNotUpdated, policyID)
218+
consistent = false
219+
}
220+
}
221+
} else if remotePolicy, ok := remoteMap[policyID]; ok && remotePolicy.ModifyIndex == remotePolicy.CreateIndex {
222+
// remote stale batch did not get the created policy than what local diff calculated
223+
remoteNotCreated = append(remoteNotCreated, policyID)
224+
consistent = false
225+
}
226+
}
227+
228+
if !consistent {
229+
err = errContainsStaleData
230+
}
231+
return remoteNotCreated, remoteNotUpdated, err
232+
}
233+
189234
func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
190235
req := structs.ACLPolicyBatchDeleteRequest{
191236
PolicyIDs: batch,
@@ -307,6 +352,11 @@ func (r *aclRoleReplicator) FetchUpdated(srv *Server, updates []string) (int, er
307352
return len(r.updated), nil
308353
}
309354

355+
func (r *aclRoleReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
356+
//return true if consistent updates,
357+
return []string{}, []string{}, nil
358+
}
359+
310360
func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
311361
req := structs.ACLRoleBatchDeleteRequest{
312362
RoleIDs: batch,

0 commit comments

Comments
 (0)