@@ -43,7 +43,7 @@ use crate::models::IndexerMessage;
43
43
use crate :: models:: RawDocBatch ;
44
44
use crate :: models:: ScratchDirectory ;
45
45
46
- #[ derive( Clone , Default , Debug ) ]
46
+ #[ derive( Clone , Default , Debug , Eq , PartialEq ) ]
47
47
pub struct IndexerCounters {
48
48
/// Overall number of documents received, partitionned
49
49
/// into 3 categories:
@@ -102,9 +102,11 @@ impl ImmutableState {
102
102
fn get_or_create_current_indexed_split < ' a > (
103
103
& self ,
104
104
current_split_opt : & ' a mut Option < IndexedSplit > ,
105
+ counters : & mut IndexerCounters ,
105
106
ctx : & ActorContext < Indexer > ,
106
107
) -> anyhow:: Result < & ' a mut IndexedSplit > {
107
108
if current_split_opt. is_none ( ) {
109
+ counters. num_docs_in_split = 0 ;
108
110
let new_indexed_split = self . create_indexed_split ( ) ?;
109
111
let commit_timeout = IndexerMessage :: CommitTimeout {
110
112
split_id : new_indexed_split. split_id . clone ( ) ,
@@ -159,7 +161,8 @@ impl ImmutableState {
159
161
counters : & mut IndexerCounters ,
160
162
ctx : & ActorContext < Indexer > ,
161
163
) -> Result < ( ) , ActorExitStatus > {
162
- let indexed_split = self . get_or_create_current_indexed_split ( current_split_opt, ctx) ?;
164
+ let indexed_split =
165
+ self . get_or_create_current_indexed_split ( current_split_opt, counters, ctx) ?;
163
166
for doc_json in batch. docs {
164
167
counters. overall_num_bytes += doc_json. len ( ) as u64 ;
165
168
indexed_split. docs_size_in_bytes += doc_json. len ( ) as u64 ;
@@ -176,6 +179,7 @@ impl ImmutableState {
176
179
} => {
177
180
counters. num_docs_in_split += 1 ;
178
181
counters. num_valid_docs += 1 ;
182
+ indexed_split. num_docs += 1 ;
179
183
if let Some ( timestamp) = timestamp_opt {
180
184
record_timestamp ( timestamp, & mut indexed_split. time_range ) ;
181
185
}
@@ -332,6 +336,7 @@ mod tests {
332
336
use std:: time:: Duration ;
333
337
334
338
use crate :: actors:: indexer:: record_timestamp;
339
+ use crate :: actors:: indexer:: IndexerCounters ;
335
340
use crate :: models:: CommitPolicy ;
336
341
use crate :: models:: RawDocBatch ;
337
342
use quickwit_actors:: create_test_mailbox;
@@ -382,9 +387,10 @@ mod tests {
382
387
& indexer_mailbox,
383
388
RawDocBatch {
384
389
docs : vec ! [
385
- "{\" body\" : \" happy\" }" . to_string( ) ,
386
- "{\" body\" : \" happy2\" }" . to_string( ) ,
387
- "{" . to_string( ) ,
390
+ r#"{"body": "happy"}"# . to_string( ) , // missing timestamp
391
+ r#"{"body": "happy", "timestamp": 1628837062}"# . to_string( ) , // ok
392
+ r#"{"body": "happy2", "timestamp": 1628837062}"# . to_string( ) , // ok
393
+ "{" . to_string( ) , // invalid json
388
394
] ,
389
395
}
390
396
. into ( ) ,
@@ -394,14 +400,22 @@ mod tests {
394
400
. send_message (
395
401
& indexer_mailbox,
396
402
RawDocBatch {
397
- docs : vec ! [ "{ \ " body\ " : \ " happy3\" }" . to_string( ) ] ,
403
+ docs : vec ! [ r#"{ "body": "happy3", "timestamp": 1628837062}"# . to_string( ) ] , // ok
398
404
}
399
405
. into ( ) ,
400
406
)
401
407
. await ?;
402
408
let indexer_counters = indexer_handle. process_pending_and_observe ( ) . await . state ;
403
- assert_eq ! ( indexer_counters. num_valid_docs, 3 ) ;
404
- assert_eq ! ( indexer_counters. num_parse_errors, 1 ) ;
409
+ assert_eq ! (
410
+ indexer_counters,
411
+ IndexerCounters {
412
+ num_parse_errors: 1 ,
413
+ num_missing_timestamp: 1 ,
414
+ num_valid_docs: 3 ,
415
+ num_docs_in_split: 3 ,
416
+ overall_num_bytes: 146 ,
417
+ }
418
+ ) ;
405
419
let output_messages = inbox. drain_available_message_for_test ( ) ;
406
420
assert_eq ! ( output_messages. len( ) , 1 ) ;
407
421
assert_eq ! ( output_messages[ 0 ] . num_docs, 3 ) ;
0 commit comments