Skip to content

Apply TSDB jump table and offset construction optimizations to binary doc values #127278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
*/
class DocValuesConsumerUtil {

static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1);
static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1, -1, -1);

record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField) {}
record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {}

static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) {
if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) {
Expand All @@ -38,6 +38,8 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me

long sumNumValues = 0;
int sumNumDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;

for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
Expand Down Expand Up @@ -86,6 +88,14 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
}
}
}
case BINARY -> {
var entry = tsdbDocValuesProducer.binaries.get(fieldInfo.number);
if (entry != null) {
sumNumDocsWithField += entry.numDocsWithField;
minLength = Math.min(minLength, entry.minLength);
maxLength = Math.max(maxLength, entry.maxLength);
}
}
default -> throw new IllegalStateException("unexpected doc values producer type: " + fieldInfo.getDocValuesType());
}
} else {
Expand All @@ -96,7 +106,7 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
}
}

return new MergeStats(true, sumNumValues, sumNumDocsWithField);
return new MergeStats(true, sumNumValues, sumNumDocsWithField, minLength, maxLength);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -258,71 +258,146 @@ public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) t
}
}

@Override
public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException {
var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo);
if (result.supported()) {
mergeBinaryField(result, mergeFieldInfo, mergeState);
} else {
super.mergeBinaryField(mergeFieldInfo, mergeState);
}
}

@Override
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
meta.writeByte(ES819TSDBDocValuesFormat.BINARY);

BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
numDocsWithField++;
BytesRef v = values.binaryValue();
int length = v.length;
data.writeBytes(v.bytes, v.offset, v.length);
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength

if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) {
final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField();
final int minLength = tsdbValuesProducer.mergeStats.minLength();
final int maxLength = tsdbValuesProducer.mergeStats.maxLength();

meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);
if (maxLength > minLength) {
start = data.getFilePointer();
meta.writeLong(start);
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
assert numDocsWithField <= maxDoc;

final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
meta,
data,
numDocsWithField + 1,
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
long addr = 0;
writer.add(addr);
values = valuesProducer.getBinary(field);
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset

OffsetsAccumulator offsetsAccumulator = null;
DISIAccumulator disiAccumulator = null;
try {
if (numDocsWithField > 0 && numDocsWithField < maxDoc) {
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

assert maxLength >= minLength;
if (maxLength > minLength) {
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
}

for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
BytesRef v = values.binaryValue();
data.writeBytes(v.bytes, v.offset, v.length);
if (disiAccumulator != null) {
disiAccumulator.addDocId(doc);
}
if (offsetsAccumulator != null) {
offsetsAccumulator.addDoc(v.length);
}
}
meta.writeLong(data.getFilePointer() - start); // dataLength

if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
final short jumpTableEntryCount = disiAccumulator.build(data);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);
if (offsetsAccumulator != null) {
offsetsAccumulator.build(meta, data);
}
} finally {
IOUtils.close(disiAccumulator, offsetsAccumulator);
}
} else {
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
addr += values.binaryValue().length;
numDocsWithField++;
BytesRef v = values.binaryValue();
int length = v.length;
data.writeBytes(v.bytes, v.offset, v.length);
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength

if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);
if (maxLength > minLength) {
start = data.getFilePointer();
meta.writeLong(start);
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);

final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
meta,
data,
numDocsWithField + 1,
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
long addr = 0;
writer.add(addr);
values = valuesProducer.getBinary(field);
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
addr += values.binaryValue().length;
writer.add(addr);
}
writer.finish();
meta.writeLong(data.getFilePointer() - start);
}
writer.finish();
meta.writeLong(data.getFilePointer() - start);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

final class ES819TSDBDocValuesProducer extends DocValuesProducer {
final IntObjectHashMap<NumericEntry> numerics;
private final IntObjectHashMap<BinaryEntry> binaries;
final IntObjectHashMap<BinaryEntry> binaries;
final IntObjectHashMap<SortedEntry> sorted;
final IntObjectHashMap<SortedSetEntry> sortedSets;
final IntObjectHashMap<SortedNumericEntry> sortedNumerics;
Expand Down Expand Up @@ -1445,7 +1445,7 @@ static class NumericEntry {
long valuesLength;
}

private static class BinaryEntry {
static class BinaryEntry {
long dataOffset;
long dataLength;
long docsWithFieldOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocIDMerger;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
Expand Down Expand Up @@ -152,6 +153,102 @@ public long longValue() throws IOException {
};
}

/** Tracks state of one binary sub-reader that we are merging */
private static class BinaryDocValuesSub extends DocIDMerger.Sub {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Lucene's DocValuesConsumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, except that it returns an anonymous subclass of TsdbDocValuesProducer instead of EmptyDocValuesProducer so that it can support merge stats.


final BinaryDocValues values;

BinaryDocValuesSub(MergeState.DocMap docMap, BinaryDocValues values) {
super(docMap);
this.values = values;
assert values.docID() == -1;
}

@Override
public int nextDoc() throws IOException {
return values.nextDoc();
}
}

/**
* Merges the binary docvalues from <code>MergeState</code>.
*
* <p>The default implementation calls {@link #addBinaryField}, passing a DocValuesProducer that
* merges and filters deleted documents on the fly.
*/
public void mergeBinaryField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException {
addBinaryField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) {
@Override
public BinaryDocValues getBinary(FieldInfo fieldInfo) throws IOException {
if (fieldInfo != mergeFieldInfo) {
throw new IllegalArgumentException("wrong fieldInfo");
}

List<BinaryDocValuesSub> subs = new ArrayList<>();

long cost = 0;
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
BinaryDocValues values = null;
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
if (docValuesProducer != null) {
FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name);
if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.BINARY) {
values = docValuesProducer.getBinary(readerFieldInfo);
}
}
if (values != null) {
cost += values.cost();
subs.add(new BinaryDocValuesSub(mergeState.docMaps[i], values));
}
}

final DocIDMerger<BinaryDocValuesSub> docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort);
final long finalCost = cost;

return new BinaryDocValues() {
private BinaryDocValuesSub current;
private int docID = -1;

@Override
public int docID() {
return docID;
}

@Override
public int nextDoc() throws IOException {
current = docIDMerger.next();
if (current == null) {
docID = NO_MORE_DOCS;
} else {
docID = current.mappedDocID;
}
return docID;
}

@Override
public int advance(int target) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean advanceExact(int target) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public long cost() {
return finalCost;
}

@Override
public BytesRef binaryValue() throws IOException {
return current.values.binaryValue();
}
};
}
});
}

/** Tracks state of one sorted numeric sub-reader that we are merging */
private static class SortedNumericDocValuesSub extends DocIDMerger.Sub {

Expand Down
Loading