47
47
import java .util .function .IntFunction ;
48
48
import java .util .function .Supplier ;
49
49
50
+ import static org .elasticsearch .TransportVersions .ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED ;
51
+
50
52
/**
51
53
* Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
52
54
* and outputs them to a new column.
@@ -113,6 +115,7 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL
113
115
private final BlockFactory blockFactory ;
114
116
115
117
private final Map <String , Integer > readersBuilt = new TreeMap <>();
118
+ private long valuesLoaded ;
116
119
117
120
int lastShard = -1 ;
118
121
int lastSegment = -1 ;
@@ -165,6 +168,9 @@ public int get(int i) {
165
168
}
166
169
}
167
170
success = true ;
171
+ for (Block b : blocks ) {
172
+ valuesLoaded += b .getTotalValueCount ();
173
+ }
168
174
return page .appendBlocks (blocks );
169
175
} catch (IOException e ) {
170
176
throw new UncheckedIOException (e );
@@ -547,7 +553,7 @@ public String toString() {
547
553
548
554
@ Override
549
555
protected Status status (long processNanos , int pagesProcessed , long rowsReceived , long rowsEmitted ) {
550
- return new Status (new TreeMap <>(readersBuilt ), processNanos , pagesProcessed , rowsReceived , rowsEmitted );
556
+ return new Status (new TreeMap <>(readersBuilt ), processNanos , pagesProcessed , rowsReceived , rowsEmitted , valuesLoaded );
551
557
}
552
558
553
559
public static class Status extends AbstractPageMappingOperator .Status {
@@ -558,21 +564,34 @@ public static class Status extends AbstractPageMappingOperator.Status {
558
564
);
559
565
560
566
private final Map <String , Integer > readersBuilt ;
561
-
562
- Status (Map <String , Integer > readersBuilt , long processNanos , int pagesProcessed , long rowsReceived , long rowsEmitted ) {
567
+ private final long valuesLoaded ;
568
+
569
+ Status (
570
+ Map <String , Integer > readersBuilt ,
571
+ long processNanos ,
572
+ int pagesProcessed ,
573
+ long rowsReceived ,
574
+ long rowsEmitted ,
575
+ long valuesLoaded
576
+ ) {
563
577
super (processNanos , pagesProcessed , rowsReceived , rowsEmitted );
564
578
this .readersBuilt = readersBuilt ;
579
+ this .valuesLoaded = valuesLoaded ;
565
580
}
566
581
567
582
Status (StreamInput in ) throws IOException {
568
583
super (in );
569
584
readersBuilt = in .readOrderedMap (StreamInput ::readString , StreamInput ::readVInt );
585
+ valuesLoaded = in .getTransportVersion ().onOrAfter (ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED ) ? in .readVLong () : 0 ;
570
586
}
571
587
572
588
@ Override
573
589
public void writeTo (StreamOutput out ) throws IOException {
574
590
super .writeTo (out );
575
591
out .writeMap (readersBuilt , StreamOutput ::writeVInt );
592
+ if (out .getTransportVersion ().onOrAfter (ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED )) {
593
+ out .writeVLong (valuesLoaded );
594
+ }
576
595
}
577
596
578
597
@ Override
@@ -584,6 +603,11 @@ public Map<String, Integer> readersBuilt() {
584
603
return readersBuilt ;
585
604
}
586
605
606
+ @ Override
607
+ public long valuesLoaded () {
608
+ return valuesLoaded ;
609
+ }
610
+
587
611
@ Override
588
612
public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
589
613
builder .startObject ();
@@ -592,6 +616,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
592
616
builder .field (e .getKey (), e .getValue ());
593
617
}
594
618
builder .endObject ();
619
+ builder .field ("values_loaded" , valuesLoaded );
595
620
innerToXContent (builder );
596
621
return builder .endObject ();
597
622
}
@@ -600,12 +625,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
600
625
public boolean equals (Object o ) {
601
626
if (super .equals (o ) == false ) return false ;
602
627
Status status = (Status ) o ;
603
- return readersBuilt .equals (status .readersBuilt );
628
+ return readersBuilt .equals (status .readersBuilt ) && valuesLoaded == status . valuesLoaded ;
604
629
}
605
630
606
631
@ Override
607
632
public int hashCode () {
608
- return Objects .hash (super .hashCode (), readersBuilt );
633
+ return Objects .hash (super .hashCode (), readersBuilt , valuesLoaded );
609
634
}
610
635
611
636
@ Override
@@ -710,6 +735,4 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
710
735
return factory .newAggregateMetricDoubleBlockBuilder (count );
711
736
}
712
737
}
713
-
714
- // TODO tests that mix source loaded fields and doc values in the same block
715
738
}
0 commit comments