Skip to content

Commit 5c4f854

Browse files
authored
fix load segment hangs (milvus-io#18770)
Signed-off-by: Wei Liu <[email protected]> Signed-off-by: Wei Liu <[email protected]>
1 parent d9c7519 commit 5c4f854

File tree

9 files changed

+118
-4
lines changed

9 files changed

+118
-4
lines changed

internal/mq/msgstream/mqwrapper/id.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ type MessageID interface {
2424
AtEarliestPosition() bool
2525

2626
LessOrEqualThan(msgID []byte) (bool, error)
27+
28+
Equal(msgID []byte) (bool, error)
2729
}

internal/mq/msgstream/mqwrapper/kafka/kafka_id.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ func (kid *kafkaID) AtEarliestPosition() bool {
1919
return kid.messageID <= 0
2020
}
2121

22+
func (kid *kafkaID) Equal(msgID []byte) (bool, error) {
23+
return kid.messageID == DeserializeKafkaID(msgID), nil
24+
}
25+
2226
func (kid *kafkaID) LessOrEqualThan(msgID []byte) (bool, error) {
2327
return kid.messageID <= DeserializeKafkaID(msgID), nil
2428
}

internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) {
4343
}
4444
}
4545

46+
func TestKafkaID_Equal(t *testing.T) {
47+
rid1 := &kafkaID{messageID: 0}
48+
rid2 := &kafkaID{messageID: 0}
49+
50+
{
51+
ret, err := rid1.Equal(rid1.Serialize())
52+
assert.Nil(t, err)
53+
assert.True(t, ret)
54+
}
55+
56+
{
57+
ret, err := rid1.Equal(rid2.Serialize())
58+
assert.Nil(t, err)
59+
assert.False(t, ret)
60+
}
61+
}
62+
4663
func Test_SerializeKafkaID(t *testing.T) {
4764
bin := SerializeKafkaID(10)
4865
assert.NotNil(t, bin)

internal/mq/msgstream/mqwrapper/pulsar/pulsar_id.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,21 @@ func (pid *pulsarID) LessOrEqualThan(msgID []byte) (bool, error) {
6060
return false, nil
6161
}
6262

63+
func (pid *pulsarID) Equal(msgID []byte) (bool, error) {
64+
pMsgID, err := pulsar.DeserializeMessageID(msgID)
65+
if err != nil {
66+
return false, err
67+
}
68+
69+
if pid.messageID.LedgerID() == pMsgID.LedgerID() &&
70+
pid.messageID.EntryID() == pMsgID.EntryID() &&
71+
pid.messageID.BatchIdx() == pMsgID.BatchIdx() {
72+
return true, nil
73+
}
74+
75+
return false, nil
76+
}
77+
6378
// SerializePulsarMsgID returns the serialized message ID
6479
func SerializePulsarMsgID(messageID pulsar.MessageID) []byte {
6580
return messageID.Serialize()

internal/mq/msgstream/mqwrapper/pulsar/pulsar_id_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,30 @@ func TestLessOrEqualThan(t *testing.T) {
7272
assert.False(t, ret)
7373
}
7474

75+
func TestPulsarID_Equal(t *testing.T) {
76+
msg1 := pulsar.EarliestMessageID()
77+
pid1 := &pulsarID{
78+
messageID: msg1,
79+
}
80+
81+
msg2 := pulsar.LatestMessageID()
82+
pid2 := &pulsarID{
83+
messageID: msg2,
84+
}
85+
86+
{
87+
ret, err := pid1.Equal(pid1.Serialize())
88+
assert.Nil(t, err)
89+
assert.True(t, ret)
90+
}
91+
92+
{
93+
ret, err := pid1.Equal(pid2.Serialize())
94+
assert.Nil(t, err)
95+
assert.False(t, ret)
96+
}
97+
}
98+
7599
func Test_SerializePulsarMsgID(t *testing.T) {
76100
mid := pulsar.EarliestMessageID()
77101

internal/mq/msgstream/mqwrapper/rmq/rmq_id.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ func (rid *rmqID) LessOrEqualThan(msgID []byte) (bool, error) {
4444
return rid.messageID < rMsgID, nil
4545
}
4646

47+
func (rid *rmqID) Equal(msgID []byte) (bool, error) {
48+
rMsgID := DeserializeRmqID(msgID)
49+
return rid.messageID == rMsgID, nil
50+
}
51+
4752
// SerializeRmqID is used to serialize a message ID to byte array
4853
func SerializeRmqID(messageID int64) []byte {
4954
b := make([]byte, 8)

internal/mq/msgstream/mqwrapper/rmq/rmq_id_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,29 @@ func TestLessOrEqualThan(t *testing.T) {
6262
assert.False(t, ret)
6363
}
6464

65+
func Test_Equal(t *testing.T) {
66+
rid1 := &rmqID{
67+
messageID: 0,
68+
}
69+
70+
rid2 := &rmqID{
71+
messageID: math.MaxInt64,
72+
}
73+
74+
{
75+
ret, err := rid1.Equal(rid1.Serialize())
76+
assert.Nil(t, err)
77+
assert.True(t, ret)
78+
79+
}
80+
81+
{
82+
ret, err := rid1.Equal(rid2.Serialize())
83+
assert.Nil(t, err)
84+
assert.False(t, ret)
85+
}
86+
}
87+
6588
func Test_SerializeRmqID(t *testing.T) {
6689
bin := SerializeRmqID(10)
6790
assert.NotNil(t, bin)

internal/querynode/segment_loader.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,8 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
664664
return err
665665
}
666666

667-
if lastMsgID.AtEarliestPosition() {
667+
reachLatest, _ := lastMsgID.Equal(position.MsgID)
668+
if reachLatest || lastMsgID.AtEarliestPosition() {
668669
log.Info("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
669670
return nil
670671
}

internal/querynode/segment_loader_test.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import (
2323
"runtime"
2424
"testing"
2525

26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/mock"
28+
"github.com/stretchr/testify/require"
29+
2630
"github.com/milvus-io/milvus/internal/common"
2731
"github.com/milvus-io/milvus/internal/mq/msgstream"
2832
"github.com/milvus-io/milvus/internal/proto/commonpb"
@@ -32,9 +36,6 @@ import (
3236
"github.com/milvus-io/milvus/internal/storage"
3337
"github.com/milvus-io/milvus/internal/util/concurrency"
3438
"github.com/milvus-io/milvus/internal/util/funcutil"
35-
"github.com/stretchr/testify/assert"
36-
"github.com/stretchr/testify/mock"
37-
"github.com/stretchr/testify/require"
3839
)
3940

4041
func TestSegmentLoader_loadSegment(t *testing.T) {
@@ -588,20 +589,31 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
588589
{
589590
mockMsg := &mockMsgID{}
590591
mockMsg.On("AtEarliestPosition").Return(false, nil)
592+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
591593
testSeekFailWhenConsumingDeltaMsg(ctx, t, position, mockMsg)
592594
}
593595

594596
//test no more data when get last msg successfully
595597
{
596598
mockMsg := &mockMsgID{}
597599
mockMsg.On("AtEarliestPosition").Return(true, nil)
600+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
601+
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
602+
}
603+
604+
// test already reach latest position
605+
{
606+
mockMsg := &mockMsgID{}
607+
mockMsg.On("AtEarliestPosition").Return(false, nil)
608+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(true, nil)
598609
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
599610
}
600611

601612
//test consume after seeking when get last msg successfully
602613
{
603614
mockMsg := &mockMsgID{}
604615
mockMsg.On("AtEarliestPosition").Return(false, nil)
616+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
605617
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil)
606618
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
607619
}
@@ -610,6 +622,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
610622
{
611623
mockMsg := &mockMsgID{}
612624
mockMsg.On("AtEarliestPosition").Return(false, nil)
625+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
613626
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
614627
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
615628
}
@@ -618,6 +631,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
618631
{
619632
mockMsg := &mockMsgID{}
620633
mockMsg.On("AtEarliestPosition").Return(false, nil)
634+
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
621635
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
622636
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, mockMsg))
623637
}
@@ -690,6 +704,15 @@ func (m2 *mockMsgID) LessOrEqualThan(msgID []byte) (bool, error) {
690704
return ret.(bool), nil
691705
}
692706

707+
func (m2 *mockMsgID) Equal(msgID []byte) (bool, error) {
708+
args := m2.Called()
709+
ret := args.Get(0)
710+
if args.Get(1) != nil {
711+
return false, args.Get(1).(error)
712+
}
713+
return ret.(bool), nil
714+
}
715+
693716
type LoadDeleteMsgStream struct {
694717
msgstream.MsgStream
695718
mock.Mock

0 commit comments

Comments
 (0)