Skip to content

Commit 77baeed

Browse files
committed
fix: refine exception message handling to prevent duplicate messages in clients with poor network conditions.
Signed-off-by: Gordon <[email protected]>
1 parent 31aaa32 commit 77baeed

16 files changed

+78
-934
lines changed

internal/conversation_msg/api.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,6 @@ func (c *Conversation) SetConversationListener(listener func() open_im_sdk_callb
148148
c.ConversationListener = listener
149149
}
150150

151-
func (c *Conversation) msgDataToLocalErrChatLog(src *model_struct.LocalChatLog) *model_struct.LocalErrChatLog {
152-
var lc model_struct.LocalErrChatLog
153-
copier.Copy(&lc, src)
154-
return &lc
155-
156-
}
157-
158151
func (c *Conversation) updateMsgStatusAndTriggerConversation(ctx context.Context, clientMsgID, serverMsgID string, sendTime int64, status int32, s *sdk_struct.MsgStruct,
159152
lc *model_struct.LocalConversation, isOnlineOnly bool) {
160153
log.ZDebug(ctx, "this is test send message ", "sendTime", sendTime, "status", status, "clientMsgID", clientMsgID, "serverMsgID", serverMsgID)

internal/conversation_msg/conversation_msg.go

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,8 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
203203
var isTriggerUnReadCount bool
204204
insertMsg := make(map[string][]*model_struct.LocalChatLog, 10)
205205
updateMsg := make(map[string][]*model_struct.LocalChatLog, 10)
206-
var exceptionMsg []*model_struct.LocalErrChatLog
207-
//var unreadMessages []*model_struct.LocalConversationUnreadMessage
206+
var exceptionMsg []*model_struct.LocalChatLog
208207
var newMessages sdk_struct.NewMsgList
209-
// var reactionMsgModifierList, reactionMsgDeleterList sdk_struct.NewMsgList
210208

211209
var isUnreadCount, isConversationUpdate, isHistory, isNotPrivate, isSenderConversationUpdate bool
212210

@@ -249,7 +247,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
249247

250248
//When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
251249
if msg.Status == constant.MsgStatusHasDeleted {
252-
insertMessage = append(insertMessage, MsgStructToLocalChatLog(msg))
250+
dbMessage := MsgStructToLocalChatLog(msg)
251+
c.handleExceptionMessages(ctx, nil, dbMessage)
252+
exceptionMsg = append(exceptionMsg, dbMessage)
253+
insertMessage = append(insertMessage, dbMessage)
253254
continue
254255
}
255256

@@ -265,10 +266,6 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
265266
if !isNotPrivate {
266267
msg.AttachedInfoElem.IsPrivateChat = true
267268
}
268-
if msg.ClientMsgID == "" {
269-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
270-
continue
271-
}
272269
if conversationID == "" {
273270
log.ZError(ctx, "conversationID is empty", errors.New("conversationID is empty"), "msg", msg)
274271
continue
@@ -281,16 +278,19 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
281278
log.ZDebug(ctx, "decode message", "msg", msg)
282279
if v.SendID == c.loginUserID { //seq
283280
// Messages sent by myself //if sent through this terminal
284-
m, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID)
281+
existingMsg, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID)
285282
if err == nil {
286283
log.ZInfo(ctx, "have message", "msg", msg)
287-
if m.Seq == 0 {
284+
if existingMsg.Seq == 0 {
288285
if !isConversationUpdate {
289286
msg.Status = constant.MsgStatusFiltered
290287
}
291288
updateMessage = append(updateMessage, MsgStructToLocalChatLog(msg))
292289
} else {
293-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
290+
dbMessage := MsgStructToLocalChatLog(msg)
291+
c.handleExceptionMessages(ctx, existingMsg, dbMessage)
292+
insertMessage = append(insertMessage, dbMessage)
293+
exceptionMsg = append(exceptionMsg, dbMessage)
294294
}
295295
} else {
296296
log.ZInfo(ctx, "sync message", "msg", msg)
@@ -318,7 +318,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
318318
}
319319
}
320320
} else { //Sent by others
321-
if _, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation
321+
if existingMsg, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil {
322322
lc := model_struct.LocalConversation{
323323
ConversationType: v.SessionType,
324324
LatestMsg: utils.StructToJsonString(msg),
@@ -352,11 +352,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
352352
}
353353

354354
} else {
355-
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
356-
log.ZWarn(ctx, "Deduplication operation ", nil, "msg", *c.msgStructToLocalErrChatLog(msg))
357-
msg.Status = constant.MsgStatusFiltered
358-
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
359-
othersInsertMessage = append(othersInsertMessage, MsgStructToLocalChatLog(msg))
355+
dbMessage := MsgStructToLocalChatLog(msg)
356+
c.handleExceptionMessages(ctx, existingMsg, dbMessage)
357+
insertMessage = append(insertMessage, dbMessage)
358+
exceptionMsg = append(exceptionMsg, dbMessage)
360359
}
361360
}
362361
}
@@ -451,6 +450,10 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
451450
}
452451
}
453452
}
453+
//Exception message storage
454+
for _, v := range exceptionMsg {
455+
log.ZWarn(ctx, "exceptionMsg show: ", nil, "msg", *v)
456+
}
454457

455458
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
456459
}
@@ -464,6 +467,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
464467

465468
insertMsg := make(map[string][]*model_struct.LocalChatLog, 10)
466469
conversationList := make([]*model_struct.LocalConversation, 0)
470+
var exceptionMsg []*model_struct.LocalChatLog
467471

468472
log.ZDebug(ctx, "message come here conversation ch in reinstalled", "conversation length", msgLen)
469473
b := time.Now()
@@ -490,7 +494,10 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
490494

491495
//When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
492496
if msg.Status == constant.MsgStatusHasDeleted {
493-
insertMessage = append(insertMessage, MsgStructToLocalChatLog(msg))
497+
dbMessage := MsgStructToLocalChatLog(msg)
498+
c.handleExceptionMessages(ctx, nil, dbMessage)
499+
exceptionMsg = append(exceptionMsg, dbMessage)
500+
insertMessage = append(insertMessage, dbMessage)
494501
continue
495502
}
496503
msg.Status = constant.MsgStatusSendSuccess
@@ -545,6 +552,10 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {
545552

546553
// log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
547554
c.ConversationListener().OnSyncServerProgress((c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress)
555+
//Exception message storage
556+
for _, v := range exceptionMsg {
557+
log.ZWarn(ctx, "exceptionMsg show: ", nil, "msg", *v)
558+
}
548559
}
549560

550561
func (c *Conversation) addInitProgress(progress int) {
@@ -606,15 +617,6 @@ func (c *Conversation) genConversationGroupAtType(lc *model_struct.LocalConversa
606617
}
607618
}
608619

609-
func (c *Conversation) msgStructToLocalErrChatLog(m *sdk_struct.MsgStruct) *model_struct.LocalErrChatLog {
610-
var lc model_struct.LocalErrChatLog
611-
copier.Copy(&lc, m)
612-
if m.SessionType == constant.WriteGroupChatType || m.SessionType == constant.ReadGroupChatType {
613-
lc.RecvID = m.GroupID
614-
}
615-
return &lc
616-
}
617-
618620
func (c *Conversation) batchUpdateMessageList(ctx context.Context, updateMsg map[string][]*model_struct.LocalChatLog) error {
619621
if updateMsg == nil {
620622
return nil

internal/conversation_msg/message_check.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
99
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
1010
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
11-
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
1211
"github.com/openimsdk/protocol/msg"
1312
"github.com/openimsdk/tools/utils/datautil"
1413

@@ -354,12 +353,54 @@ func reverse[T any](arr []T) {
354353
}
355354
}
356355

356+
// handleExceptionMessages handles the insertion of exception messages into the local chat log.
357+
// It identifies and marks messages that fall into the following categories:
358+
// 1. Messages pulled but marked as deleted, with non-repeating seq, requiring placeholders.
359+
// 2. Seq jump caused by server downtime, with non-repeating seq, requiring placeholders.
360+
// 3. Messages sent by the sender with a duplicate ClientMsgID but unique seq.
361+
// This can occur due to either client-side message duplication or server-side
362+
// message re-consumption, where the same ClientMsgID is sent again with a different Seq.
363+
// 4. Concurrent message filling with both duplicate ClientMsgID and seq.
364+
func (c *Conversation) handleExceptionMessages(ctx context.Context, existingMessage, message *model_struct.LocalChatLog) {
365+
var prefix string
366+
367+
if existingMessage == nil {
368+
// Case: The message is marked as deleted
369+
if message.Status == constant.MsgStatusHasDeleted {
370+
// If ClientMsgID is empty, it's a placeholder for seq gap
371+
if message.ClientMsgID == "" {
372+
prefix = "[SEQ_GAP]" // Placeholder for sequence gap
373+
} else {
374+
prefix = "[DELETED]" // Mark as a deleted message
375+
}
376+
} else {
377+
// For messages that don't fall under known exceptional cases, log as normal
378+
prefix = "[UNKNOWN]"
379+
log.ZWarn(ctx, "Message is normal, no need to handle", nil, "message", message)
380+
}
381+
} else {
382+
// Case: The message has a duplicate ClientMsgID
383+
if existingMessage.Seq == message.Seq {
384+
// Case: Both ClientMsgID and Seq are duplicated, it's a concurrent message filling
385+
prefix = "[SEQ_DUP]" // Duplicate sequence message, likely caused by concurrent message handling
386+
} else {
387+
// Case: ClientMsgID is duplicated, but Seq is different, indicating a client-side duplication
388+
prefix = "[CLIENT_DUP]" // Client-side resend or server-side consume messages duplication
389+
}
390+
}
391+
392+
// Mark the message as deleted
393+
message.Status = constant.MsgStatusHasDeleted
394+
// Add the exception prefix to the ClientMsgID for identification
395+
message.ClientMsgID = prefix + message.ClientMsgID
396+
}
397+
357398
func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map[string]*sdkws.PullMsgs) {
358399
insertMsg := make(map[string][]*model_struct.LocalChatLog, 20)
359400
updateMsg := make(map[string][]*model_struct.LocalChatLog, 30)
360401
var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog
361402
var updateMessage []*model_struct.LocalChatLog
362-
var exceptionMsg []*model_struct.LocalErrChatLog
403+
var exceptionMsg []*model_struct.LocalChatLog
363404

364405
log.ZDebug(ctx, "do Msg come here, len: ", "msg length", len(pullMsgData))
365406
for conversationID, msgs := range pullMsgData {
@@ -377,13 +418,8 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
377418
//without any conversation and message update.
378419
msg := MsgDataToLocalChatLog(v)
379420
if v.Status == constant.MsgStatusHasDeleted {
380-
insertMessage = append(insertMessage, msg)
381-
continue
382-
}
383-
// The message might be a filler provided by the server due to a gap in the sequence.
384-
if msg.ClientMsgID == "" {
385-
msg.ClientMsgID = utils.GetMsgID(c.loginUserID) + utils.Int64ToString(msg.Seq)
386-
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
421+
c.handleExceptionMessages(ctx, nil, msg)
422+
exceptionMsg = append(exceptionMsg, msg)
387423
insertMessage = append(insertMessage, msg)
388424
continue
389425
}
@@ -398,8 +434,8 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
398434
} else {
399435
// The message you sent is duplicated, possibly due to a resend or the server consuming
400436
// the message multiple times.
401-
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
402-
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
437+
c.handleExceptionMessages(ctx, existingMsg, msg)
438+
exceptionMsg = append(exceptionMsg, msg)
403439
insertMessage = append(insertMessage, msg)
404440
}
405441
} else { // send through other terminal
@@ -413,8 +449,8 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
413449
} else {
414450
// The message sent by others is duplicated, possibly due to a resend or the server consuming
415451
// the message multiple times.
416-
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
417-
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
452+
c.handleExceptionMessages(ctx, existingMsg, msg)
453+
exceptionMsg = append(exceptionMsg, msg)
418454
insertMessage = append(insertMessage, msg)
419455
}
420456
}

pkg/db/chat_log_model.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -341,16 +341,6 @@ func (d *DataBase) UpdateMsgSenderFaceURLAndSenderNickname(ctx context.Context,
341341
map[string]interface{}{"sender_face_url": faceURL, "sender_nick_name": nickname}).Error, utils.GetSelfFuncName()+" failed")
342342
}
343343

344-
func (d *DataBase) GetAlreadyExistSeqList(ctx context.Context, conversationID string, lostSeqList []int64) (seqList []int64, err error) {
345-
d.mRWMutex.RLock()
346-
defer d.mRWMutex.RUnlock()
347-
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Where("seq IN ?", lostSeqList).Pluck("seq", &seqList).Error, utils.GetSelfFuncName()+" failed")
348-
if err != nil {
349-
return nil, err
350-
}
351-
return seqList, nil
352-
}
353-
354344
func (d *DataBase) UpdateColumnsMessage(ctx context.Context, conversationID, ClientMsgID string, args map[string]interface{}) error {
355345
d.mRWMutex.Lock()
356346
defer d.mRWMutex.Unlock()
@@ -403,10 +393,6 @@ func (d *DataBase) MarkConversationMessageAsReadDB(ctx context.Context, conversa
403393
rowsAffected++
404394
}
405395
}
406-
// t := d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Where("client_msg_id in ? AND send_id != ?", msgIDs, d.loginUserID).Update("is_read", constant.HasRead)
407-
// if t.RowsAffected == 0 {
408-
// return 0, errs.WrapMsg(errors.New("RowsAffected == 0"), "no update")
409-
// }
410396
return rowsAffected, nil
411397
}
412398

0 commit comments

Comments
 (0)