Skip to content

Tsdb doc values inline building jump table #126499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
Expand All @@ -39,6 +41,7 @@
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;

import java.io.IOException;
Expand All @@ -54,6 +57,7 @@

final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {

final Directory dir;
IndexOutput data, meta;
final int maxDoc;
private byte[] termsDictBuffer;
Expand All @@ -70,6 +74,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
String metaExtension
) throws IOException {
this.termsDictBuffer = new byte[1 << 14];
this.dir = state.directory;
boolean success = false;
try {
final String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
Expand Down Expand Up @@ -138,89 +143,125 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
meta.writeLong(numValues);
meta.writeInt(numDocsWithValue);

if (numValues > 0) {
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput();
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance(
meta,
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);

final long valuesDataOffset = data.getFilePointer();
// Special case for maxOrd of 1, skip writing the blocks
if (maxOrd != 1) {
final long[] buffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
int bufferSize = 0;
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
values = valuesProducer.getSortedNumeric(field);
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
final int count = values.docValueCount();
for (int i = 0; i < count; ++i) {
buffer[bufferSize++] = values.nextValue();
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
indexWriter.add(data.getFilePointer() - valuesDataOffset);
if (maxOrd >= 0) {
encoder.encodeOrdinals(buffer, data, bitsPerOrd);
} else {
encoder.encode(buffer, data);
IndexOutput disiTempOutput = null;
String skipListTempFileName = null;
IndexedDISIBuilder docIdSetBuilder = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make IndexedDISIBuilder (or maybe Accumulator is a better name?) Closable, pass a Directory in its constructor, and pass the IndexOutput in the build (or flush) method to copy the temporary output to the data output? This way, we only need a single reference here, making the code more manageable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really made the code more manageable! I also forked the original IndexedDISI tests and adapted that for DISIAccumulator.

try {
if (numValues > 0) {
// Special case for maxOrd of 1, signal -1 that no blocks will be written
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1);
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput();
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance(
meta,
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"),
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT),
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);

final long valuesDataOffset = data.getFilePointer();
// Special case for maxOrd of 1, skip writing the blocks
if (maxOrd != 1) {
final long[] buffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
int bufferSize = 0;
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
values = valuesProducer.getSortedNumeric(field);
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
if (numDocsWithValue != 0 && numDocsWithValue != maxDoc) {
// TODO: which IOContext should be used here?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use MERGE for this IOContext?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a MERGE constant for io context. But I did the following instead: 45308e4

I think this way we always get most appropriate io context? In case of merge it will be an io context for merging?

disiTempOutput = dir.createTempOutput(data.getName(), "disi", IOContext.DEFAULT);
skipListTempFileName = disiTempOutput.getName();
docIdSetBuilder = new IndexedDISIBuilder(disiTempOutput, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
if (docIdSetBuilder != null) {
docIdSetBuilder.addDocId(doc);
}
final int count = values.docValueCount();
for (int i = 0; i < count; ++i) {
buffer[bufferSize++] = values.nextValue();
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) {
indexWriter.add(data.getFilePointer() - valuesDataOffset);
if (maxOrd >= 0) {
encoder.encodeOrdinals(buffer, data, bitsPerOrd);
} else {
encoder.encode(buffer, data);
}
bufferSize = 0;
}
bufferSize = 0;
}
}
}
if (bufferSize > 0) {
indexWriter.add(data.getFilePointer() - valuesDataOffset);
// Fill unused slots in the block with zeroes rather than junk
Arrays.fill(buffer, bufferSize, ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE, 0L);
if (maxOrd >= 0) {
encoder.encodeOrdinals(buffer, data, bitsPerOrd);
} else {
encoder.encode(buffer, data);
if (bufferSize > 0) {
indexWriter.add(data.getFilePointer() - valuesDataOffset);
// Fill unused slots in the block with zeroes rather than junk
Arrays.fill(buffer, bufferSize, ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE, 0L);
if (maxOrd >= 0) {
encoder.encodeOrdinals(buffer, data, bitsPerOrd);
} else {
encoder.encode(buffer, data);
}
}
}
}

final long valuesDataLength = data.getFilePointer() - valuesDataOffset;
if (maxOrd != 1) {
// Special case for maxOrd of 1, indexWriter isn't really used, so no need to invoke finish() method.
indexWriter.finish();
}
final long indexDataOffset = data.getFilePointer();
data.copyBytes(indexOut.toDataInput(), indexOut.size());
meta.writeLong(indexDataOffset);
meta.writeLong(data.getFilePointer() - indexDataOffset);
final long valuesDataLength = data.getFilePointer() - valuesDataOffset;
if (maxOrd != 1) {
// Special case for maxOrd of 1, indexWriter isn't really used, so no need to invoke finish() method.
indexWriter.finish();
}
final long indexDataOffset = data.getFilePointer();
data.copyBytes(indexOut.toDataInput(), indexOut.size());
meta.writeLong(indexDataOffset);
meta.writeLong(data.getFilePointer() - indexDataOffset);

meta.writeLong(valuesDataOffset);
meta.writeLong(valuesDataLength);
}
meta.writeLong(valuesDataOffset);
meta.writeLong(valuesDataLength);
}

if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getSortedNumeric(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
final short jumpTableEntryCount;
if (maxOrd != 1 && docIdSetBuilder != null) {
jumpTableEntryCount = docIdSetBuilder.build();
disiTempOutput.close();
try (
// TODO: which IOContext should be used here?
var addressDataInput = dir.openInput(skipListTempFileName, IOContext.DEFAULT)
) {
data.copyBytes(addressDataInput, addressDataInput.length());
}
} else {
values = valuesProducer.getSortedNumeric(field);
jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
} finally {
IOUtils.close(disiTempOutput);
if (skipListTempFileName != null) {
deleteFilesIgnoringExceptions(skipListTempFileName);
}
}

return new long[] { numDocsWithValue, numValues };
}

@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
private void deleteFilesIgnoringExceptions(String skipListTempFileName) {
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(dir, skipListTempFileName);
}

@Override
public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException {
var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo);
Expand Down
Loading