Skip to content

Commit 6fd8346

Browse files
authored
Fix watchDmChannel may be out-of-date after compaction issue and add context (milvus-io#18681)
Signed-off-by: wayblink <[email protected]> Signed-off-by: wayblink <[email protected]>
1 parent d5bb377 commit 6fd8346

File tree

9 files changed

+143
-29
lines changed

9 files changed

+143
-29
lines changed

internal/datacoord/services.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,12 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
318318
var info *SegmentInfo
319319
if req.IncludeUnHealthy {
320320
info = s.meta.GetAllSegment(id)
321-
if info != nil {
322-
infos = append(infos, info.SegmentInfo)
321+
if info == nil {
322+
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
323+
resp.Status.Reason = fmt.Sprintf("failed to get segment %d", id)
324+
return resp, nil
323325
}
326+
infos = append(infos, info.SegmentInfo)
324327
} else {
325328
info = s.meta.GetSegment(id)
326329
if info == nil {

internal/querycoord/global_meta_broker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,9 @@ func (broker *globalMetaBroker) releaseSegmentReferLock(ctx context.Context, tas
534534
}
535535

536536
// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord
537-
func (broker *globalMetaBroker) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
537+
func (broker *globalMetaBroker) getDataSegmentInfosByIDs(ctx context.Context, segmentIds []int64) ([]*datapb.SegmentInfo, error) {
538538
var segmentInfos []*datapb.SegmentInfo
539-
infoResp, err := broker.dataCoord.GetSegmentInfo(broker.ctx, &datapb.GetSegmentInfoRequest{
539+
infoResp, err := broker.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
540540
Base: &commonpb.MsgBase{
541541
MsgType: commonpb.MsgType_SegmentInfo,
542542
MsgID: 0,

internal/querycoord/global_meta_broker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,18 @@ func TestGetDataSegmentInfosByIDs(t *testing.T) {
170170
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, cm)
171171
assert.Nil(t, err)
172172

173-
segmentInfos, err := handler.getDataSegmentInfosByIDs([]int64{1})
173+
segmentInfos, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1})
174174
assert.Nil(t, err)
175175
assert.Equal(t, 1, len(segmentInfos))
176176

177177
dataCoord.returnError = true
178-
segmentInfos2, err := handler.getDataSegmentInfosByIDs([]int64{1})
178+
segmentInfos2, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1})
179179
assert.Error(t, err)
180180
assert.Empty(t, segmentInfos2)
181181

182182
dataCoord.returnError = false
183183
dataCoord.returnGrpcError = true
184-
segmentInfos3, err := handler.getDataSegmentInfosByIDs([]int64{1})
184+
segmentInfos3, err := handler.getDataSegmentInfosByIDs(ctx, []int64{1})
185185
assert.Error(t, err)
186186
assert.Empty(t, segmentInfos3)
187187

internal/querycoord/mock_3rd_component_test.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ import (
2222
"fmt"
2323
"sync"
2424

25-
"github.com/milvus-io/milvus/internal/log"
26-
"github.com/milvus-io/milvus/internal/proto/etcdpb"
25+
"go.uber.org/atomic"
2726
"go.uber.org/zap"
2827

29-
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
30-
3128
"github.com/milvus-io/milvus/internal/common"
29+
"github.com/milvus-io/milvus/internal/log"
3230
"github.com/milvus-io/milvus/internal/proto/commonpb"
3331
"github.com/milvus-io/milvus/internal/proto/datapb"
32+
"github.com/milvus-io/milvus/internal/proto/etcdpb"
3433
"github.com/milvus-io/milvus/internal/proto/indexpb"
3534
"github.com/milvus-io/milvus/internal/proto/internalpb"
3635
"github.com/milvus-io/milvus/internal/proto/milvuspb"
3736
"github.com/milvus-io/milvus/internal/proto/proxypb"
37+
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
3838
"github.com/milvus-io/milvus/internal/proto/schemapb"
3939
"github.com/milvus-io/milvus/internal/storage"
4040
"github.com/milvus-io/milvus/internal/types"
@@ -334,6 +334,7 @@ type dataCoordMock struct {
334334
channelNumPerCol int
335335
returnError bool
336336
returnGrpcError bool
337+
returnErrorCount atomic.Int32
337338
segmentState commonpb.SegmentState
338339
errLevel int
339340

@@ -376,14 +377,27 @@ func (data *dataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetR
376377
}, nil
377378
}
378379

380+
if data.returnErrorCount.Load() > 0 {
381+
data.returnErrorCount.Dec()
382+
return &datapb.GetRecoveryInfoResponse{
383+
Status: &commonpb.Status{
384+
ErrorCode: commonpb.ErrorCode_UnexpectedError,
385+
Reason: "limited get recovery info failed",
386+
},
387+
}, nil
388+
}
389+
379390
if _, ok := data.col2DmChannels[collectionID]; !ok {
380391
channelInfos := make([]*datapb.VchannelInfo, 0)
381392
data.collections = append(data.collections, collectionID)
382393
for i := int32(0); i < common.DefaultShardsNum; i++ {
383394
vChannel := fmt.Sprintf("%s_%d_%dv%d", Params.CommonCfg.RootCoordDml, i, collectionID, i)
384395
channelInfo := &datapb.VchannelInfo{
385-
CollectionID: collectionID,
386-
ChannelName: vChannel,
396+
CollectionID: collectionID,
397+
ChannelName: vChannel,
398+
UnflushedSegmentIds: []int64{int64(i*1000 + 1)},
399+
FlushedSegmentIds: []int64{int64(i*1000 + 2)},
400+
DroppedSegmentIds: []int64{int64(i*1000 + 3)},
387401
SeekPosition: &internalpb.MsgPosition{
388402
ChannelName: vChannel,
389403
},
@@ -528,6 +542,16 @@ func (data *dataCoordMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSe
528542
}, nil
529543
}
530544

545+
if data.returnErrorCount.Load() > 0 {
546+
data.returnErrorCount.Dec()
547+
return &datapb.GetSegmentInfoResponse{
548+
Status: &commonpb.Status{
549+
ErrorCode: commonpb.ErrorCode_UnexpectedError,
550+
Reason: "limited mock get segmentInfo failed",
551+
},
552+
}, nil
553+
}
554+
531555
var segmentInfos []*datapb.SegmentInfo
532556
for _, segmentID := range req.SegmentIDs {
533557
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{

internal/querycoord/task.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
514514
ReplicaID: replica.GetReplicaID(),
515515
}
516516

517-
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.broker, watchRequest)
517+
fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lct.broker, watchRequest)
518518
if err != nil {
519519
lct.setResultInfo(err)
520520
return err
@@ -1008,7 +1008,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
10081008
ReplicaID: replica.GetReplicaID(),
10091009
}
10101010

1011-
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.broker, watchRequest)
1011+
fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lpt.broker, watchRequest)
10121012
if err != nil {
10131013
lpt.setResultInfo(err)
10141014
return err
@@ -2059,7 +2059,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
20592059
watchRequest.PartitionIDs = toRecoverPartitionIDs
20602060
}
20612061

2062-
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.broker, watchRequest)
2062+
fullWatchRequest, err := generateFullWatchDmChannelsRequest(ctx, lbt.broker, watchRequest)
20632063
if err != nil {
20642064
lbt.setResultInfo(err)
20652065
return err

internal/querycoord/task_scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
393393
if err != nil {
394394
return nil, err
395395
}
396-
fullReq, err := generateFullWatchDmChannelsRequest(scheduler.broker, &req)
396+
fullReq, err := generateFullWatchDmChannelsRequest(baseTask.traceCtx(), scheduler.broker, &req)
397397
if err != nil {
398398
return nil, err
399399
}

internal/querycoord/task_util.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
package querycoord
1818

1919
import (
20+
"context"
21+
2022
"github.com/golang/protobuf/proto"
2123
"github.com/milvus-io/milvus/internal/log"
2224
"github.com/milvus-io/milvus/internal/proto/datapb"
2325
"github.com/milvus-io/milvus/internal/proto/querypb"
26+
"github.com/milvus-io/milvus/internal/util/retry"
2427
"go.uber.org/zap"
2528
)
2629

2730
// generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from meta broker
28-
func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) {
31+
func generateFullWatchDmChannelsRequest(ctx context.Context, broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) {
2932
cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest)
3033
vChannels := cloned.GetInfos()
3134

@@ -34,25 +37,74 @@ func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *query
3437
reviseVChannelInfo(vChannel)
3538
}
3639

40+
vChannelDict := make(map[string]bool, len(vChannels))
41+
for _, info := range vChannels {
42+
vChannelDict[info.ChannelName] = true
43+
}
44+
var segmentInfos []*datapb.SegmentInfo
45+
46+
// if the return segmentInfos is less than required, this may because the segment is compacted.
47+
// refresh the vchannels and segmentInfos needed.
48+
retryFunc := func() error {
49+
newVChannels := make([]*datapb.VchannelInfo, 0)
50+
newSegmentIds := make([]int64, 0)
51+
52+
newVChannelDict := make(map[string]bool)
53+
for _, partitionID := range request.GetLoadMeta().GetPartitionIDs() {
54+
partitionVChannels, _, err := broker.getRecoveryInfo(ctx, request.GetCollectionID(), partitionID)
55+
if err != nil {
56+
log.Error("GetRecoveryInfo failed, retrying...", zap.Error(err))
57+
return err
58+
}
59+
for _, vchannel := range partitionVChannels {
60+
if vChannelDict[vchannel.GetChannelName()] && !newVChannelDict[vchannel.GetChannelName()] {
61+
newVChannels = append(newVChannels, vchannel)
62+
newVChannelDict[vchannel.GetChannelName()] = true
63+
}
64+
}
65+
}
66+
67+
for _, vChannel := range newVChannels {
68+
newSegmentIds = append(newSegmentIds, vChannel.FlushedSegmentIds...)
69+
newSegmentIds = append(newSegmentIds, vChannel.UnflushedSegmentIds...)
70+
newSegmentIds = append(newSegmentIds, vChannel.DroppedSegmentIds...)
71+
}
72+
newSegmentInfos, err := broker.getDataSegmentInfosByIDs(ctx, newSegmentIds)
73+
if err != nil {
74+
log.Error("Get Vchannel SegmentInfos failed, retrying...", zap.Error(err))
75+
return err
76+
}
77+
78+
cloned.Infos = newVChannels
79+
segmentInfos = newSegmentInfos
80+
return nil
81+
}
82+
3783
// fill segmentInfos
3884
segmentIds := make([]int64, 0)
3985
for _, vChannel := range vChannels {
4086
segmentIds = append(segmentIds, vChannel.FlushedSegmentIds...)
4187
segmentIds = append(segmentIds, vChannel.UnflushedSegmentIds...)
4288
segmentIds = append(segmentIds, vChannel.DroppedSegmentIds...)
4389
}
44-
segmentInfos, err := broker.getDataSegmentInfosByIDs(segmentIds)
90+
segmentInfos, err := broker.getDataSegmentInfosByIDs(ctx, segmentIds)
91+
4592
if err != nil {
4693
log.Error("Get Vchannel SegmentInfos failed", zap.Error(err))
47-
return nil, err
94+
retryErr := retry.Do(ctx, retryFunc, retry.Attempts(20))
95+
if retryErr != nil {
96+
log.Error("Get Vchannel SegmentInfos failed after retry", zap.Error(retryErr))
97+
return nil, retryErr
98+
}
4899
}
100+
49101
segmentDict := make(map[int64]*datapb.SegmentInfo)
50102
for _, info := range segmentInfos {
51103
segmentDict[info.ID] = info
52104
}
53105
cloned.SegmentInfos = segmentDict
54106

55-
return cloned, err
107+
return cloned, nil
56108
}
57109

58110
// thinWatchDmChannelsRequest will return a thin version of WatchDmChannelsRequest

internal/querycoord/task_util_test.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
)
2828

2929
func TestGenerateFullWatchDmChannelsRequest(t *testing.T) {
30-
dataCoord := &dataCoordMock{}
3130
ctx, cancel := context.WithCancel(context.Background())
31+
dataCoord := newDataCoordMock(ctx)
3232
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil)
3333
assert.Nil(t, err)
3434

@@ -46,12 +46,12 @@ func TestGenerateFullWatchDmChannelsRequest(t *testing.T) {
4646
NodeID: 1,
4747
}
4848

49-
fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest)
49+
fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest)
5050
assert.Nil(t, err)
5151
assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos())
5252

5353
dataCoord.returnError = true
54-
fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest)
54+
fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest)
5555
assert.Error(t, err)
5656
assert.Empty(t, fullWatchDmChannelsRequest2.GetSegmentInfos())
5757

@@ -88,8 +88,8 @@ func TestThinWatchDmChannelsRequest(t *testing.T) {
8888
}
8989

9090
func TestUpgradeCompatibility(t *testing.T) {
91-
dataCoord := &dataCoordMock{}
9291
ctx, cancel := context.WithCancel(context.Background())
92+
dataCoord := newDataCoordMock(ctx)
9393
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil)
9494
assert.Nil(t, err)
9595

@@ -110,7 +110,7 @@ func TestUpgradeCompatibility(t *testing.T) {
110110
NodeID: 1,
111111
}
112112

113-
fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest)
113+
fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(ctx, handler, watchDmChannelsRequest)
114114
assert.Nil(t, err)
115115
assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos())
116116
vChannel := fullWatchDmChannelsRequest.GetInfos()[0]
@@ -124,3 +124,35 @@ func TestUpgradeCompatibility(t *testing.T) {
124124
assert.Equal(t, 1, len(vChannel.GetUnflushedSegmentIds()))
125125
cancel()
126126
}
127+
128+
func TestGetMissSegment(t *testing.T) {
129+
ctx, cancel := context.WithCancel(context.Background())
130+
dataCoord := newDataCoordMock(ctx)
131+
broker, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil)
132+
assert.Nil(t, err)
133+
134+
vChannels, _, err := broker.getRecoveryInfo(ctx, defaultCollectionID, 0)
135+
assert.Nil(t, err)
136+
137+
watchDmChannelsRequest := &querypb.WatchDmChannelsRequest{
138+
Base: &commonpb.MsgBase{
139+
MsgType: commonpb.MsgType_WatchDmChannels,
140+
},
141+
CollectionID: defaultCollectionID,
142+
PartitionIDs: []int64{1},
143+
Infos: vChannels,
144+
NodeID: 1,
145+
LoadMeta: &querypb.LoadMetaInfo{
146+
LoadType: querypb.LoadType_LoadCollection,
147+
CollectionID: defaultCollectionID,
148+
PartitionIDs: []int64{1},
149+
},
150+
}
151+
152+
// inject certain number of error
153+
dataCoord.returnErrorCount.Store(3)
154+
155+
_, err = generateFullWatchDmChannelsRequest(ctx, broker, watchDmChannelsRequest)
156+
assert.NoError(t, err)
157+
cancel()
158+
}

internal/querynode/task.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,11 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
170170
for _, info := range w.req.Infos {
171171
for _, ufInfoID := range info.GetUnflushedSegmentIds() {
172172
// unFlushed segment may not have binLogs, skip loading
173-
ufInfo := w.req.SegmentInfos[ufInfoID]
174-
if len(ufInfo.Binlogs) > 0 {
173+
ufInfo := w.req.GetSegmentInfos()[ufInfoID]
174+
if ufInfo == nil {
175+
log.Warn("an unflushed segment is not found in segment infos", zap.Int64("segment ID", ufInfoID))
176+
}
177+
if len(ufInfo.GetBinlogs()) > 0 {
175178
unFlushedSegments = append(unFlushedSegments, &queryPb.SegmentLoadInfo{
176179
SegmentID: ufInfo.ID,
177180
PartitionID: ufInfo.PartitionID,
@@ -182,7 +185,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
182185
Deltalogs: ufInfo.Deltalogs,
183186
InsertChannel: ufInfo.InsertChannel,
184187
})
185-
unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.ID)
188+
unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.GetID())
186189
}
187190
}
188191
}

0 commit comments

Comments
 (0)