@@ -91,7 +91,13 @@ impl Client {
9191 . ok ( ) ?;
9292
9393 for c in r. items . iter ( ) {
94- if !c. content . unreadable {
94+ let is_countable =
95+ if let Some ( cb) = self . store . countable_callback . lock ( ) . unwrap ( ) . as_ref ( ) {
96+ cb. is_countable ( c. content . clone ( ) )
97+ } else {
98+ !c. content . unreadable
99+ } ;
100+ if is_countable {
95101 conversation. last_message_at = c. created_at . clone ( ) ;
96102 conversation. last_message = Some ( c. content . clone ( ) ) ;
97103 conversation. last_sender_id = c. sender_id . clone ( ) ;
@@ -117,7 +123,12 @@ impl Client {
117123 ensure_conversation_last_version : Option < bool > ,
118124 ) {
119125 let st = now_millis ( ) ;
120- let limit = if limit == 0 { MAX_LOGS_LIMIT /2 } else { limit } . min ( MAX_LOGS_LIMIT ) ;
126+ let limit = if limit == 0 {
127+ MAX_LOGS_LIMIT / 2
128+ } else {
129+ limit
130+ }
131+ . min ( MAX_LOGS_LIMIT ) ;
121132 let conversation = self
122133 . store
123134 . get_conversation (
@@ -127,9 +138,13 @@ impl Client {
127138 )
128139 . await
129140 . unwrap_or_default ( ) ;
130-
141+
131142 let store_st = now_millis ( ) ;
132- match self . store . get_chat_logs ( & topic_id, conversation. start_seq , last_seq, limit) . await {
143+ match self
144+ . store
145+ . get_chat_logs ( & topic_id, conversation. start_seq , last_seq, limit)
146+ . await
147+ {
133148 Ok ( ( local_logs, need_fetch) ) => {
134149 info ! (
135150 "sync_chat_logs local_logs.len: {} start_seq: {} last_seq: {:?} limit: {} local_logs.start_sort_value:{} local_logs.end_sort_value:{} need_fetch:{} store_cost:{:?} total_cost:{:?}" ,
@@ -165,10 +180,7 @@ impl Client {
165180 }
166181 }
167182
168- pub async fn save_chat_logs (
169- & self ,
170- logs : & Vec < ChatLog > ,
171- ) -> Result < ( ) > {
183+ pub async fn save_chat_logs ( & self , logs : & Vec < ChatLog > ) -> Result < ( ) > {
172184 self . store . save_chat_logs ( logs) . await
173185 }
174186
@@ -437,17 +449,32 @@ impl Client {
437449 None => continue ,
438450 } ;
439451
452+ if self . store . countable_callback . lock ( ) . unwrap ( ) . is_some ( ) {
453+ conversation. unread = 0
454+ }
455+
440456 for c in lr. items . iter ( ) {
441457 if c. seq <= conversation. last_seq {
442458 continue ;
443459 }
444460 conversation. last_seq = c. seq ;
445- if !c. content . unreadable {
461+
462+ let is_countable =
463+ if let Some ( cb) = self . store . countable_callback . lock ( ) . unwrap ( ) . as_ref ( ) {
464+ cb. is_countable ( c. content . clone ( ) )
465+ } else {
466+ !c. content . unreadable
467+ } ;
468+
469+ if is_countable {
446470 conversation. updated_at = c. created_at . clone ( ) ;
447471 conversation. last_message_at = c. created_at . clone ( ) ;
448472 conversation. last_message = Some ( c. content . clone ( ) ) ;
449473 conversation. last_sender_id = c. sender_id . clone ( ) ;
450474 conversation. last_message_seq = Some ( c. seq ) ;
475+ if conversation. last_read_seq < c. seq {
476+ conversation. unread += 1 ;
477+ }
451478 }
452479 break ;
453480 }
@@ -457,6 +484,11 @@ impl Client {
457484 if let Some ( cb) = self . store . callback . lock ( ) . unwrap ( ) . as_ref ( ) {
458485 cb. on_conversations_updated ( updated_conversations. clone ( ) ) ;
459486 }
487+ // sync to store
488+ let t = self . store . message_storage . table :: < Conversation > ( ) . await ;
489+ for c in updated_conversations. iter_mut ( ) {
490+ t. set ( "" , & c. topic_id , Some ( c) ) . await . ok ( ) ;
491+ }
460492 Ok ( ( ) )
461493 }
462494
@@ -534,10 +566,7 @@ impl Client {
534566 ) -> Result < Conversation > {
535567 self . store . set_conversation_extra ( & topic_id, extra) . await
536568 }
537- pub async fn clear_conversation (
538- & self ,
539- topic_id : String
540- ) -> Result < ( ) > {
569+ pub async fn clear_conversation ( & self , topic_id : String ) -> Result < ( ) > {
541570 self . store . clear_conversation ( & topic_id) . await
542571 }
543572}
0 commit comments