Skip to content

Tsdb doc values inline building jump table #126499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 17, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.AsyncProfiler;
Expand All @@ -51,9 +50,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
Expand All @@ -71,23 +69,10 @@ public class TSDBDocValuesMergeBenchmark {
LogConfigurator.setNodeName("test");
}

@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private static final String TIMESTAMP_FIELD = "@timestamp";
private static final String HOSTNAME_FIELD = "host.name";
private static final long BASE_TIMESTAMP = 1704067200000L;

private IndexWriter indexWriterWithoutOptimizedMerge;
private IndexWriter indexWriterWithOptimizedMerge;
private ExecutorService executorService;

public static void main(String[] args) throws RunnerException {
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
.addProfiler(AsyncProfiler.class)
Expand All @@ -96,78 +81,168 @@ public static void main(String[] args) throws RunnerException {
new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws IOException {
executorService = Executors.newSingleThreadExecutor();
@State(Scope.Benchmark)
public static class StateDenseWithoutOptimizeMerge {

final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));
@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private Directory directory;
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);

@Setup(Level.Trial)
public void setup() throws IOException {
directory = FSDirectory.open(Files.createTempDirectory("temp2-"));
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
}

indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
}

private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException {
final var iwc = createIndexWriterConfig(optimizedMergeEnabled);
long counter1 = 0;
long counter2 = 10_000_000;
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
int numHosts = 1000;
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
@Benchmark
public void forceMergeDenseWithoutOptimizedMerge(StateDenseWithoutOptimizeMerge state) throws IOException {
forceMerge(state.directory, state.iwc.get());
}

final Random random = new Random(seed);
IndexWriter indexWriter = new IndexWriter(directory, iwc);
for (int i = 0; i < nDocs; i++) {
final Document doc = new Document();

final int batchIndex = i / numHosts;
final String hostName = "host-" + batchIndex;
// Slightly vary the timestamp in each document
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);

doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
int numTags = tags.length % (i + 1);
for (int j = 0; j < numTags; j++) {
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
}
@State(Scope.Benchmark)
public static class StateDenseWithOptimizeMerge {

@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private Directory directory;
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);

@Setup(Level.Trial)
public void setup() throws IOException {
directory = FSDirectory.open(Files.createTempDirectory("temp1-"));
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
}

}

@Benchmark
public void forceMergeDenseWithOptimizedMerge(StateDenseWithOptimizeMerge state) throws IOException {
forceMerge(state.directory, state.iwc.get());
}

@State(Scope.Benchmark)
public static class StateSparseWithoutOptimizeMerge {

indexWriter.addDocument(doc);
@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private Directory directory;
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);

@Setup(Level.Trial)
public void setup() throws IOException {
directory = FSDirectory.open(Files.createTempDirectory("temp4-"));
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
}
indexWriter.commit();
return indexWriter;

}

@Benchmark
public void forceMergeWithoutOptimizedMerge() throws IOException {
forceMerge(indexWriterWithoutOptimizedMerge);
public void forceMergeSparseWithoutOptimizedMerge(StateSparseWithoutOptimizeMerge state) throws IOException {
forceMerge(state.directory, state.iwc.get());
}

@State(Scope.Benchmark)
public static class StateSparseWithOptimizeMerge {

@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private Directory directory;
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);

@Setup(Level.Trial)
public void setup() throws IOException {
directory = FSDirectory.open(Files.createTempDirectory("temp3-"));
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
}

}

@Benchmark
public void forceMergeWithOptimizedMerge() throws IOException {
forceMerge(indexWriterWithOptimizedMerge);
public void forceMergeSparseWithOptimizedMerge(StateSparseWithOptimizeMerge state) throws IOException {
forceMerge(state.directory, state.iwc.get());
}

private void forceMerge(final IndexWriter indexWriter) throws IOException {
indexWriter.forceMerge(1);
private void forceMerge(Directory directory, IndexWriterConfig config) throws IOException {
try (var indexWriter = new IndexWriter(directory, config)) {
indexWriter.forceMerge(1);
}
}

@TearDown(Level.Trial)
public void tearDown() {
if (executorService != null) {
executorService.shutdown();
try {
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
executorService.shutdownNow();
static void createIndex(Directory directory, IndexWriterConfig iwc, boolean sparse, int nDocs, int deltaTime, int seed)
throws IOException {
long counter1 = 0;
long counter2 = 10_000_000;
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
int numHosts = 10000;
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };

final Random random = new Random(seed);
try (var indexWriter = new IndexWriter(directory, iwc)) {
for (int i = 0; i < nDocs; i++) {
final Document doc = new Document();

final int batchIndex = i % numHosts;
final String hostName = "host-" + batchIndex;
// Slightly vary the timestamp in each document
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);

doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
if (sparse == false || random.nextBoolean()) {
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
}
if (sparse == false || random.nextBoolean()) {
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
}
if (sparse == false || random.nextBoolean()) {
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
}
if (sparse == false || random.nextBoolean()) {
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
}
if (sparse == false || random.nextBoolean()) {
int numTags = tags.length % (i + 1);
for (int j = 0; j < numTags; j++) {
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
}
}
indexWriter.addDocument(doc);

if (i % 10000 == 0) {
indexWriter.commit();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Expand Down
Loading