-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Tsdb doc values inline building jump table #126499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
946d793
55b54cd
0c644a2
5547721
a814b76
406202c
91bcfee
79dc490
75972ff
45308e4
13ca444
2b7a323
feb408d
df2705d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,8 @@ | |
import org.apache.lucene.store.ByteArrayDataOutput; | ||
import org.apache.lucene.store.ByteBuffersDataOutput; | ||
import org.apache.lucene.store.ByteBuffersIndexOutput; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.util.ArrayUtil; | ||
import org.apache.lucene.util.BytesRef; | ||
|
@@ -39,6 +41,7 @@ | |
import org.apache.lucene.util.packed.DirectMonotonicWriter; | ||
import org.apache.lucene.util.packed.PackedInts; | ||
import org.elasticsearch.core.IOUtils; | ||
import org.elasticsearch.core.SuppressForbidden; | ||
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder; | ||
|
||
import java.io.IOException; | ||
|
@@ -54,6 +57,7 @@ | |
|
||
final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { | ||
|
||
final Directory dir; | ||
IndexOutput data, meta; | ||
final int maxDoc; | ||
private byte[] termsDictBuffer; | ||
|
@@ -70,6 +74,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { | |
String metaExtension | ||
) throws IOException { | ||
this.termsDictBuffer = new byte[1 << 14]; | ||
this.dir = state.directory; | ||
boolean success = false; | ||
try { | ||
final String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension); | ||
|
@@ -138,89 +143,125 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, | |
meta.writeLong(numValues); | ||
meta.writeInt(numDocsWithValue); | ||
|
||
if (numValues > 0) { | ||
// Special case for maxOrd of 1, signal -1 that no blocks will be written | ||
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1); | ||
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput(); | ||
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance( | ||
meta, | ||
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"), | ||
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT), | ||
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT | ||
); | ||
|
||
final long valuesDataOffset = data.getFilePointer(); | ||
// Special case for maxOrd of 1, skip writing the blocks | ||
if (maxOrd != 1) { | ||
final long[] buffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE]; | ||
int bufferSize = 0; | ||
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); | ||
values = valuesProducer.getSortedNumeric(field); | ||
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; | ||
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { | ||
final int count = values.docValueCount(); | ||
for (int i = 0; i < count; ++i) { | ||
buffer[bufferSize++] = values.nextValue(); | ||
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) { | ||
indexWriter.add(data.getFilePointer() - valuesDataOffset); | ||
if (maxOrd >= 0) { | ||
encoder.encodeOrdinals(buffer, data, bitsPerOrd); | ||
} else { | ||
encoder.encode(buffer, data); | ||
IndexOutput disiTempOutput = null; | ||
String skipListTempFileName = null; | ||
IndexedDISIBuilder docIdSetBuilder = null; | ||
try { | ||
if (numValues > 0) { | ||
// Special case for maxOrd of 1, signal -1 that no blocks will be written | ||
meta.writeInt(maxOrd != 1 ? ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT : -1); | ||
final ByteBuffersDataOutput indexOut = new ByteBuffersDataOutput(); | ||
final DirectMonotonicWriter indexWriter = DirectMonotonicWriter.getInstance( | ||
meta, | ||
new ByteBuffersIndexOutput(indexOut, "temp-dv-index", "temp-dv-index"), | ||
1L + ((numValues - 1) >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT), | ||
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT | ||
); | ||
|
||
final long valuesDataOffset = data.getFilePointer(); | ||
// Special case for maxOrd of 1, skip writing the blocks | ||
if (maxOrd != 1) { | ||
final long[] buffer = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE]; | ||
int bufferSize = 0; | ||
final TSDBDocValuesEncoder encoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE); | ||
values = valuesProducer.getSortedNumeric(field); | ||
final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1; | ||
if (numDocsWithValue != 0 && numDocsWithValue != maxDoc) { | ||
// TODO: which IOContext should be used here? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should use MERGE for this IOContext? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a I think this way we always get most appropriate io context? In case of merge it will be an io context for merging? |
||
disiTempOutput = dir.createTempOutput(data.getName(), "disi", IOContext.DEFAULT); | ||
skipListTempFileName = disiTempOutput.getName(); | ||
docIdSetBuilder = new IndexedDISIBuilder(disiTempOutput, IndexedDISI.DEFAULT_DENSE_RANK_POWER); | ||
martijnvg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { | ||
if (docIdSetBuilder != null) { | ||
docIdSetBuilder.addDocId(doc); | ||
} | ||
final int count = values.docValueCount(); | ||
for (int i = 0; i < count; ++i) { | ||
buffer[bufferSize++] = values.nextValue(); | ||
if (bufferSize == ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE) { | ||
indexWriter.add(data.getFilePointer() - valuesDataOffset); | ||
if (maxOrd >= 0) { | ||
encoder.encodeOrdinals(buffer, data, bitsPerOrd); | ||
} else { | ||
encoder.encode(buffer, data); | ||
} | ||
bufferSize = 0; | ||
} | ||
bufferSize = 0; | ||
} | ||
} | ||
} | ||
if (bufferSize > 0) { | ||
indexWriter.add(data.getFilePointer() - valuesDataOffset); | ||
// Fill unused slots in the block with zeroes rather than junk | ||
Arrays.fill(buffer, bufferSize, ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE, 0L); | ||
if (maxOrd >= 0) { | ||
encoder.encodeOrdinals(buffer, data, bitsPerOrd); | ||
} else { | ||
encoder.encode(buffer, data); | ||
if (bufferSize > 0) { | ||
indexWriter.add(data.getFilePointer() - valuesDataOffset); | ||
// Fill unused slots in the block with zeroes rather than junk | ||
Arrays.fill(buffer, bufferSize, ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE, 0L); | ||
if (maxOrd >= 0) { | ||
encoder.encodeOrdinals(buffer, data, bitsPerOrd); | ||
} else { | ||
encoder.encode(buffer, data); | ||
} | ||
} | ||
} | ||
} | ||
|
||
final long valuesDataLength = data.getFilePointer() - valuesDataOffset; | ||
if (maxOrd != 1) { | ||
// Special case for maxOrd of 1, indexWriter isn't really used, so no need to invoke finish() method. | ||
indexWriter.finish(); | ||
} | ||
final long indexDataOffset = data.getFilePointer(); | ||
data.copyBytes(indexOut.toDataInput(), indexOut.size()); | ||
meta.writeLong(indexDataOffset); | ||
meta.writeLong(data.getFilePointer() - indexDataOffset); | ||
final long valuesDataLength = data.getFilePointer() - valuesDataOffset; | ||
if (maxOrd != 1) { | ||
// Special case for maxOrd of 1, indexWriter isn't really used, so no need to invoke finish() method. | ||
indexWriter.finish(); | ||
} | ||
final long indexDataOffset = data.getFilePointer(); | ||
data.copyBytes(indexOut.toDataInput(), indexOut.size()); | ||
meta.writeLong(indexDataOffset); | ||
meta.writeLong(data.getFilePointer() - indexDataOffset); | ||
|
||
meta.writeLong(valuesDataOffset); | ||
meta.writeLong(valuesDataLength); | ||
} | ||
meta.writeLong(valuesDataOffset); | ||
meta.writeLong(valuesDataLength); | ||
} | ||
|
||
if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values | ||
meta.writeLong(-2); // docsWithFieldOffset | ||
meta.writeLong(0L); // docsWithFieldLength | ||
meta.writeShort((short) -1); // jumpTableEntryCount | ||
meta.writeByte((byte) -1); // denseRankPower | ||
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values | ||
meta.writeLong(-1); // docsWithFieldOffset | ||
meta.writeLong(0L); // docsWithFieldLength | ||
meta.writeShort((short) -1); // jumpTableEntryCount | ||
meta.writeByte((byte) -1); // denseRankPower | ||
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values | ||
long offset = data.getFilePointer(); | ||
meta.writeLong(offset); // docsWithFieldOffset | ||
values = valuesProducer.getSortedNumeric(field); | ||
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); | ||
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength | ||
meta.writeShort(jumpTableEntryCount); | ||
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); | ||
if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values | ||
meta.writeLong(-2); // docsWithFieldOffset | ||
meta.writeLong(0L); // docsWithFieldLength | ||
meta.writeShort((short) -1); // jumpTableEntryCount | ||
meta.writeByte((byte) -1); // denseRankPower | ||
} else if (numDocsWithValue == maxDoc) { // meta[-1, 0]: All documents have values | ||
meta.writeLong(-1); // docsWithFieldOffset | ||
meta.writeLong(0L); // docsWithFieldLength | ||
meta.writeShort((short) -1); // jumpTableEntryCount | ||
meta.writeByte((byte) -1); // denseRankPower | ||
} else { // meta[data.offset, data.length]: IndexedDISI structure for documents with values | ||
long offset = data.getFilePointer(); | ||
meta.writeLong(offset); // docsWithFieldOffset | ||
final short jumpTableEntryCount; | ||
if (maxOrd != 1 && docIdSetBuilder != null) { | ||
jumpTableEntryCount = docIdSetBuilder.build(); | ||
disiTempOutput.close(); | ||
try ( | ||
// TODO: which IOContext should be used here? | ||
var addressDataInput = dir.openInput(skipListTempFileName, IOContext.DEFAULT) | ||
) { | ||
data.copyBytes(addressDataInput, addressDataInput.length()); | ||
} | ||
} else { | ||
values = valuesProducer.getSortedNumeric(field); | ||
jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER); | ||
} | ||
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength | ||
meta.writeShort(jumpTableEntryCount); | ||
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER); | ||
} | ||
} finally { | ||
IOUtils.close(disiTempOutput); | ||
if (skipListTempFileName != null) { | ||
deleteFilesIgnoringExceptions(skipListTempFileName); | ||
} | ||
} | ||
|
||
return new long[] { numDocsWithValue, numValues }; | ||
} | ||
|
||
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)") | ||
private void deleteFilesIgnoringExceptions(String skipListTempFileName) { | ||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(dir, skipListTempFileName); | ||
} | ||
|
||
@Override | ||
public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { | ||
var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make IndexedDISIBuilder (or maybe Accumulator is a better name?) Closable, pass a Directory in its constructor, and pass the IndexOutput in the build (or flush) method to copy the temporary output to the data output? This way, we only need a single reference here, making the code more manageable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really made the code more manageable! I also forked the original IndexedDISI tests and adapted that for
DISIAccumulator
.