Skip to content

Commit 096149e

Browse files
authored
fix GetCompactionState hang (milvus-io#18829)
Signed-off-by: Enwei Jiao <[email protected]> Signed-off-by: Enwei Jiao <[email protected]>
1 parent 1bd87e7 commit 096149e

File tree

5 files changed

+87
-30
lines changed

5 files changed

+87
-30
lines changed

internal/datacoord/compaction.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,15 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
164164

165165
c.setSegmentsCompacting(plan, true)
166166

167+
task := &compactionTask{
168+
triggerInfo: signal,
169+
plan: plan,
170+
state: executing,
171+
dataNodeID: nodeID,
172+
}
173+
c.plans[plan.PlanID] = task
174+
c.executingTaskNum++
175+
167176
go func() {
168177
log.Debug("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
169178
c.acquireQueue(nodeID)
@@ -173,17 +182,11 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
173182
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
174183
return
175184
}
176-
plan.StartTime = ts
177185

178186
c.mu.Lock()
179-
task := &compactionTask{
180-
triggerInfo: signal,
181-
plan: plan,
182-
state: executing,
183-
dataNodeID: nodeID,
184-
}
185-
c.plans[plan.PlanID] = task
186-
c.executingTaskNum++
187+
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
188+
task.plan.StartTime = ts
189+
})
187190
c.mu.Unlock()
188191

189192
err = c.sessions.Compaction(nodeID, plan)
@@ -192,6 +195,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
192195
c.mu.Lock()
193196
delete(c.plans, plan.PlanID)
194197
c.executingTaskNum--
198+
c.releaseQueue(nodeID)
195199
c.mu.Unlock()
196200
return
197201
}
@@ -274,20 +278,34 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
274278
stateResult, ok := planStates[task.plan.PlanID]
275279
state := stateResult.GetState()
276280
planID := task.plan.PlanID
281+
startTime := task.plan.GetStartTime()
277282

283+
// start time is 0 means this task have not started, skip checker
284+
if startTime == 0 {
285+
continue
286+
}
278287
// check wether the state of CompactionPlan is working
279288
if ok {
280-
// check wether the CompactionPlan is timeout
281-
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
282-
continue
283-
}
284289
if state == commonpb.CompactionState_Completed {
290+
log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
285291
c.completeCompaction(stateResult.GetResult())
286292
continue
287293
}
294+
// check wether the CompactionPlan is timeout
295+
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
296+
continue
297+
}
298+
log.Info("compaction timeout",
299+
zap.Int64("planID", task.plan.PlanID),
300+
zap.Int64("nodeID", task.dataNodeID),
301+
zap.Uint64("startTime", task.plan.GetStartTime()),
302+
zap.Uint64("now", ts),
303+
)
288304
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout))
305+
continue
289306
}
290307

308+
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
291309
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
292310
c.setSegmentsCompacting(task.plan, false)
293311
c.executingTaskNum--

internal/datacoord/compaction_test.go

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,35 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
7878
false,
7979
nil,
8080
},
81+
{
82+
"test exec compaction failed",
83+
fields{
84+
plans: map[int64]*compactionTask{},
85+
sessions: &SessionManager{
86+
sessions: struct {
87+
sync.RWMutex
88+
data map[int64]*Session
89+
}{
90+
data: map[int64]*Session{
91+
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
92+
},
93+
},
94+
},
95+
chManager: &ChannelManager{
96+
store: &ChannelStore{
97+
channelsInfo: map[int64]*NodeChannelInfo{
98+
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
99+
},
100+
},
101+
},
102+
},
103+
args{
104+
signal: &compactionSignal{id: 100},
105+
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
106+
},
107+
true,
108+
nil,
109+
},
81110
}
82111
for _, tt := range tests {
83112
t.Run(tt.name, func(t *testing.T) {
@@ -93,9 +122,19 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
93122
if err == nil {
94123
<-ch
95124
task := c.getCompaction(tt.args.plan.PlanID)
96-
assert.Equal(t, tt.args.plan, task.plan)
97-
assert.Equal(t, tt.args.signal, task.triggerInfo)
98-
assert.Equal(t, 1, c.executingTaskNum)
125+
if !tt.wantErr {
126+
assert.Equal(t, tt.args.plan, task.plan)
127+
assert.Equal(t, tt.args.signal, task.triggerInfo)
128+
assert.Equal(t, 1, c.executingTaskNum)
129+
} else {
130+
assert.Eventually(t,
131+
func() bool {
132+
c.mu.RLock()
133+
defer c.mu.RUnlock()
134+
return c.executingTaskNum == 0 && len(c.parallelCh[1]) == 0
135+
},
136+
5*time.Second, 100*time.Millisecond)
137+
}
99138
}
100139
})
101140
}
@@ -455,7 +494,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
455494
fields fields
456495
args args
457496
wantErr bool
458-
expired []int64
497+
timeout []int64
498+
failed []int64
459499
unexpired []int64
460500
}{
461501
{
@@ -484,7 +524,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
484524
},
485525
},
486526
3: {
487-
state: completed,
527+
state: executing,
488528
dataNodeID: 2,
489529
plan: &datapb.CompactionPlan{
490530
PlanID: 3,
@@ -519,6 +559,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
519559
compactionStateResp: &datapb.CompactionStateResponse{
520560
Results: []*datapb.CompactionStateResult{
521561
{PlanID: 1, State: commonpb.CompactionState_Executing},
562+
{PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}},
563+
{PlanID: 4, State: commonpb.CompactionState_Executing},
522564
},
523565
},
524566
}},
@@ -528,7 +570,8 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
528570
},
529571
args{ts: tsoutil.ComposeTS(ts.Add(5*time.Second).UnixNano()/int64(time.Millisecond), 0)},
530572
false,
531-
[]int64{2, 4},
573+
[]int64{4},
574+
[]int64{2},
532575
[]int64{1, 3},
533576
},
534577
}
@@ -543,7 +586,12 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
543586
err := c.updateCompaction(tt.args.ts)
544587
assert.Equal(t, tt.wantErr, err != nil)
545588

546-
for _, id := range tt.expired {
589+
for _, id := range tt.timeout {
590+
task := c.getCompaction(id)
591+
assert.Equal(t, timeout, task.state)
592+
}
593+
594+
for _, id := range tt.failed {
547595
task := c.getCompaction(id)
548596
assert.Equal(t, failed, task.state)
549597
}

internal/datacoord/compaction_trigger.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -478,12 +478,7 @@ func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
478478
if err != nil {
479479
return err
480480
}
481-
ts, err := t.allocator.allocTimestamp(context.TODO())
482-
if err != nil {
483-
return err
484-
}
485481
plan.PlanID = id
486-
plan.StartTime = ts
487482
plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds
488483
return nil
489484
}

internal/datacoord/compaction_trigger_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func Test_compactionTrigger_force(t *testing.T) {
203203
},
204204
},
205205
},
206-
StartTime: 3,
206+
StartTime: 0,
207207
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds,
208208
Type: datapb.CompactionType_MixCompaction,
209209
Timetravel: 200,

internal/datacoord/services.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -960,10 +960,6 @@ func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo
960960
}
961961

962962
func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, failedCnt, timeoutCnt int) {
963-
if len(tasks) == 0 {
964-
state = commonpb.CompactionState_Executing
965-
return
966-
}
967963
for _, t := range tasks {
968964
switch t.state {
969965
case executing:

0 commit comments

Comments
 (0)