Skip to content

Commit 0d41e9a

Browse files
authored
Tsdb doc values inline building jump table (#126499)
Build jump table (disi) while iterating over SortedNumericDocValues for encoding the values, instead of separately iterating over SortedNumericDocValues just to build the jump table. In case when indexing sorting is active, this requires an additional merge sort. Follow up from #125403
1 parent 4248c99 commit 0d41e9a

File tree

4 files changed

+952
-140
lines changed

4 files changed

+952
-140
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 145 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.openjdk.jmh.annotations.Scope;
4040
import org.openjdk.jmh.annotations.Setup;
4141
import org.openjdk.jmh.annotations.State;
42-
import org.openjdk.jmh.annotations.TearDown;
4342
import org.openjdk.jmh.annotations.Threads;
4443
import org.openjdk.jmh.annotations.Warmup;
4544
import org.openjdk.jmh.profile.AsyncProfiler;
@@ -51,9 +50,8 @@
5150
import java.io.IOException;
5251
import java.nio.file.Files;
5352
import java.util.Random;
54-
import java.util.concurrent.ExecutorService;
55-
import java.util.concurrent.Executors;
5653
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Supplier;
5755

5856
@BenchmarkMode(Mode.SingleShotTime)
5957
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -71,23 +69,10 @@ public class TSDBDocValuesMergeBenchmark {
7169
LogConfigurator.setNodeName("test");
7270
}
7371

74-
@Param("20431204")
75-
private int nDocs;
76-
77-
@Param("1000")
78-
private int deltaTime;
79-
80-
@Param("42")
81-
private int seed;
82-
8372
private static final String TIMESTAMP_FIELD = "@timestamp";
8473
private static final String HOSTNAME_FIELD = "host.name";
8574
private static final long BASE_TIMESTAMP = 1704067200000L;
8675

87-
private IndexWriter indexWriterWithoutOptimizedMerge;
88-
private IndexWriter indexWriterWithOptimizedMerge;
89-
private ExecutorService executorService;
90-
9176
public static void main(String[] args) throws RunnerException {
9277
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
9378
.addProfiler(AsyncProfiler.class)
@@ -96,78 +81,168 @@ public static void main(String[] args) throws RunnerException {
9681
new Runner(options).run();
9782
}
9883

99-
@Setup(Level.Trial)
100-
public void setup() throws IOException {
101-
executorService = Executors.newSingleThreadExecutor();
84+
@State(Scope.Benchmark)
85+
public static class StateDenseWithoutOptimizeMerge {
10286

103-
final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
104-
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));
87+
@Param("20431204")
88+
private int nDocs;
89+
90+
@Param("1000")
91+
private int deltaTime;
92+
93+
@Param("42")
94+
private int seed;
95+
96+
private Directory directory;
97+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);
98+
99+
@Setup(Level.Trial)
100+
public void setup() throws IOException {
101+
directory = FSDirectory.open(Files.createTempDirectory("temp2-"));
102+
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
103+
}
105104

106-
indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
107-
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
108105
}
109106

110-
private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException {
111-
final var iwc = createIndexWriterConfig(optimizedMergeEnabled);
112-
long counter1 = 0;
113-
long counter2 = 10_000_000;
114-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
115-
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
116-
int numHosts = 1000;
117-
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
107+
@Benchmark
108+
public void forceMergeDenseWithoutOptimizedMerge(StateDenseWithoutOptimizeMerge state) throws IOException {
109+
forceMerge(state.directory, state.iwc.get());
110+
}
118111

119-
final Random random = new Random(seed);
120-
IndexWriter indexWriter = new IndexWriter(directory, iwc);
121-
for (int i = 0; i < nDocs; i++) {
122-
final Document doc = new Document();
123-
124-
final int batchIndex = i / numHosts;
125-
final String hostName = "host-" + batchIndex;
126-
// Slightly vary the timestamp in each document
127-
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
128-
129-
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
130-
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
131-
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
132-
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
133-
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
134-
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
135-
int numTags = tags.length % (i + 1);
136-
for (int j = 0; j < numTags; j++) {
137-
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
138-
}
112+
@State(Scope.Benchmark)
113+
public static class StateDenseWithOptimizeMerge {
114+
115+
@Param("20431204")
116+
private int nDocs;
117+
118+
@Param("1000")
119+
private int deltaTime;
120+
121+
@Param("42")
122+
private int seed;
123+
124+
private Directory directory;
125+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);
126+
127+
@Setup(Level.Trial)
128+
public void setup() throws IOException {
129+
directory = FSDirectory.open(Files.createTempDirectory("temp1-"));
130+
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
131+
}
132+
133+
}
134+
135+
@Benchmark
136+
public void forceMergeDenseWithOptimizedMerge(StateDenseWithOptimizeMerge state) throws IOException {
137+
forceMerge(state.directory, state.iwc.get());
138+
}
139+
140+
@State(Scope.Benchmark)
141+
public static class StateSparseWithoutOptimizeMerge {
139142

140-
indexWriter.addDocument(doc);
143+
@Param("20431204")
144+
private int nDocs;
145+
146+
@Param("1000")
147+
private int deltaTime;
148+
149+
@Param("42")
150+
private int seed;
151+
152+
private Directory directory;
153+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);
154+
155+
@Setup(Level.Trial)
156+
public void setup() throws IOException {
157+
directory = FSDirectory.open(Files.createTempDirectory("temp4-"));
158+
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
141159
}
142-
indexWriter.commit();
143-
return indexWriter;
160+
144161
}
145162

146163
@Benchmark
147-
public void forceMergeWithoutOptimizedMerge() throws IOException {
148-
forceMerge(indexWriterWithoutOptimizedMerge);
164+
public void forceMergeSparseWithoutOptimizedMerge(StateSparseWithoutOptimizeMerge state) throws IOException {
165+
forceMerge(state.directory, state.iwc.get());
166+
}
167+
168+
@State(Scope.Benchmark)
169+
public static class StateSparseWithOptimizeMerge {
170+
171+
@Param("20431204")
172+
private int nDocs;
173+
174+
@Param("1000")
175+
private int deltaTime;
176+
177+
@Param("42")
178+
private int seed;
179+
180+
private Directory directory;
181+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);
182+
183+
@Setup(Level.Trial)
184+
public void setup() throws IOException {
185+
directory = FSDirectory.open(Files.createTempDirectory("temp3-"));
186+
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
187+
}
188+
149189
}
150190

151191
@Benchmark
152-
public void forceMergeWithOptimizedMerge() throws IOException {
153-
forceMerge(indexWriterWithOptimizedMerge);
192+
public void forceMergeSparseWithOptimizedMerge(StateSparseWithOptimizeMerge state) throws IOException {
193+
forceMerge(state.directory, state.iwc.get());
154194
}
155195

156-
private void forceMerge(final IndexWriter indexWriter) throws IOException {
157-
indexWriter.forceMerge(1);
196+
private void forceMerge(Directory directory, IndexWriterConfig config) throws IOException {
197+
try (var indexWriter = new IndexWriter(directory, config)) {
198+
indexWriter.forceMerge(1);
199+
}
158200
}
159201

160-
@TearDown(Level.Trial)
161-
public void tearDown() {
162-
if (executorService != null) {
163-
executorService.shutdown();
164-
try {
165-
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
166-
executorService.shutdownNow();
202+
static void createIndex(Directory directory, IndexWriterConfig iwc, boolean sparse, int nDocs, int deltaTime, int seed)
203+
throws IOException {
204+
long counter1 = 0;
205+
long counter2 = 10_000_000;
206+
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
207+
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
208+
int numHosts = 10000;
209+
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
210+
211+
final Random random = new Random(seed);
212+
try (var indexWriter = new IndexWriter(directory, iwc)) {
213+
for (int i = 0; i < nDocs; i++) {
214+
final Document doc = new Document();
215+
216+
final int batchIndex = i % numHosts;
217+
final String hostName = "host-" + batchIndex;
218+
// Slightly vary the timestamp in each document
219+
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
220+
221+
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
222+
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
223+
if (sparse == false || random.nextBoolean()) {
224+
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
225+
}
226+
if (sparse == false || random.nextBoolean()) {
227+
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
228+
}
229+
if (sparse == false || random.nextBoolean()) {
230+
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
231+
}
232+
if (sparse == false || random.nextBoolean()) {
233+
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
234+
}
235+
if (sparse == false || random.nextBoolean()) {
236+
int numTags = tags.length % (i + 1);
237+
for (int j = 0; j < numTags; j++) {
238+
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
239+
}
240+
}
241+
indexWriter.addDocument(doc);
242+
243+
if (i % 10000 == 0) {
244+
indexWriter.commit();
167245
}
168-
} catch (InterruptedException e) {
169-
executorService.shutdownNow();
170-
Thread.currentThread().interrupt();
171246
}
172247
}
173248
}

0 commit comments

Comments
 (0)