Skip to content

Commit 3ea43de

Browse files
committed
Enable time-series block hash
1 parent d65f34d commit 3ea43de

File tree

7 files changed

+193
-45
lines changed

7 files changed

+193
-45
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

+113-28
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,20 @@
1919
import org.elasticsearch.compute.data.BlockFactory;
2020
import org.elasticsearch.compute.data.BytesRefBlock;
2121
import org.elasticsearch.compute.data.BytesRefVector;
22+
import org.elasticsearch.compute.data.ElementType;
2223
import org.elasticsearch.compute.data.IntBlock;
2324
import org.elasticsearch.compute.data.IntVector;
2425
import org.elasticsearch.compute.data.LongBlock;
2526
import org.elasticsearch.compute.data.LongVector;
2627
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
2728
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2829
import org.elasticsearch.compute.data.Page;
30+
import org.elasticsearch.core.Assertions;
2931
import org.elasticsearch.core.Releasable;
3032
import org.elasticsearch.core.ReleasableIterator;
3133
import org.elasticsearch.core.Releasables;
3234

33-
import java.util.Objects;
35+
import java.util.List;
3436

3537
/**
3638
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
@@ -41,7 +43,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
4143
private final int tsHashChannel;
4244
private final int timestampIntervalChannel;
4345

44-
private final BytesRef lastTsid = new BytesRef();
46+
private int lastTsidPosition = 0;
4547
private final BytesRefArrayWithSize tsidArray;
4648

4749
private long lastTimestamp;
@@ -50,56 +52,120 @@ public final class TimeSeriesBlockHash extends BlockHash {
5052
private int currentTimestampCount;
5153
private final IntArrayWithSize perTsidCountArray;
5254

55+
private final BlockHash assertingHash;
56+
5357
public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
5458
super(blockFactory);
5559
this.tsHashChannel = tsHashChannel;
5660
this.timestampIntervalChannel = timestampIntervalChannel;
5761
this.tsidArray = new BytesRefArrayWithSize(blockFactory);
5862
this.timestampArray = new LongArrayWithSize(blockFactory);
5963
this.perTsidCountArray = new IntArrayWithSize(blockFactory);
64+
if (Assertions.ENABLED) {
65+
final var groupSpecs = List.of(
66+
new GroupSpec(tsHashChannel, ElementType.BYTES_REF),
67+
new GroupSpec(timestampIntervalChannel, ElementType.BYTES_REF)
68+
);
69+
assertingHash = new PackedValuesBlockHash(groupSpecs, blockFactory, Integer.MAX_VALUE);
70+
} else {
71+
assertingHash = null;
72+
}
6073
}
6174

6275
@Override
6376
public void close() {
64-
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
77+
Releasables.close(tsidArray, timestampArray, perTsidCountArray, assertingHash);
78+
}
79+
80+
private OrdinalBytesRefVector getTsidVector(Page page) {
81+
BytesRefBlock block = page.getBlock(tsHashChannel);
82+
var ordinalBlock = block.asOrdinals();
83+
if (ordinalBlock == null) {
84+
throw new IllegalStateException("expected ordinal block for tsid");
85+
}
86+
var ordinalVector = ordinalBlock.asVector();
87+
if (ordinalVector == null) {
88+
throw new IllegalStateException("expected ordinal vector for tsid");
89+
}
90+
return ordinalVector;
91+
}
92+
93+
private LongVector getTimestampVector(Page page) {
94+
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
95+
LongVector timestampsVector = timestampsBlock.asVector();
96+
if (timestampsVector == null) {
97+
throw new IllegalStateException("expected long vector for timestamp");
98+
}
99+
return timestampsVector;
65100
}
66101

67102
@Override
68103
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
69-
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70-
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71-
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72-
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
104+
final BytesRefVector tsidDict;
105+
final IntVector tsidOrdinals;
106+
{
107+
final var tsidVector = getTsidVector(page);
108+
tsidDict = tsidVector.getDictionaryVector();
109+
tsidOrdinals = tsidVector.getOrdinalsVector();
110+
}
111+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
74112
final BytesRef spare = new BytesRef();
75-
// TODO: optimize incoming ordinal block
76-
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77-
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
113+
final LongVector timestampVector = getTimestampVector(page);
114+
int lastOrd = -1;
115+
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
116+
final int newOrd = tsidOrdinals.getInt(i);
117+
boolean newGroup = false;
118+
if (lastOrd != newOrd) {
119+
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
120+
if (positionCount() == 0) {
121+
newGroup = true;
122+
} else if (lastOrd == -1) {
123+
newGroup = lastTsid().equals(newTsid) == false;
124+
} else {
125+
newGroup = true;
126+
}
127+
if (newGroup) {
128+
endTsidGroup();
129+
lastTsidPosition = tsidArray.count;
130+
tsidArray.append(newTsid);
131+
}
132+
lastOrd = newOrd;
133+
}
78134
final long timestamp = timestampVector.getLong(i);
79-
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
135+
if (newGroup || timestamp != lastTimestamp) {
136+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
137+
timestampArray.append(timestamp);
138+
lastTimestamp = timestamp;
139+
currentTimestampCount++;
140+
}
141+
ordsBuilder.appendInt(timestampArray.count - 1);
80142
}
81143
try (var ords = ordsBuilder.build()) {
82144
addInput.add(0, ords);
145+
assert assertingAddInputPage(page, ords);
83146
}
84147
}
85148
}
86149

87-
private int addOnePosition(BytesRef tsid, long timestamp) {
88-
boolean newGroup = false;
89-
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90-
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91-
endTsidGroup();
92-
tsidArray.append(tsid);
93-
tsidArray.get(tsidArray.count - 1, lastTsid);
94-
newGroup = true;
95-
}
96-
if (newGroup || timestamp != lastTimestamp) {
97-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98-
timestampArray.append(timestamp);
99-
lastTimestamp = timestamp;
100-
currentTimestampCount++;
101-
}
102-
return positionCount() - 1;
150+
private boolean assertingAddInputPage(Page page, IntVector actualIds) {
151+
assert assertingHash != null;
152+
assertingHash.add(page, new GroupingAggregatorFunction.AddInput() {
153+
@Override
154+
public void add(int positionOffset, IntBlock groupIds) {
155+
assert false : "add(IntBlock) shouldn't be called";
156+
}
157+
158+
@Override
159+
public void add(int positionOffset, IntVector expectedIds) {
160+
assert expectedIds.equals(actualIds) : "actual=" + actualIds + " vs expected = " + expectedIds;
161+
}
162+
163+
@Override
164+
public void close() {
165+
166+
}
167+
});
168+
return true;
103169
}
104170

105171
private void endTsidGroup() {
@@ -109,6 +175,12 @@ private void endTsidGroup() {
109175
}
110176
}
111177

178+
private BytesRef lastTsid() {
179+
final BytesRef bytesRef = new BytesRef();
180+
tsidArray.get(lastTsidPosition, bytesRef);
181+
return bytesRef;
182+
}
183+
112184
@Override
113185
public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
114186
throw new UnsupportedOperationException("TODO");
@@ -125,6 +197,7 @@ public Block[] getKeys() {
125197
blocks[0] = buildTsidBlock();
126198
}
127199
blocks[1] = timestampArray.toBlock();
200+
assert assertingKeys(blocks);
128201
return blocks;
129202
} finally {
130203
if (blocks[blocks.length - 1] == null) {
@@ -133,6 +206,18 @@ public Block[] getKeys() {
133206
}
134207
}
135208

209+
private boolean assertingKeys(Block[] actualKeys) {
210+
assert assertingHash != null;
211+
Block[] expectedKeys = assertingHash.getKeys();
212+
try {
213+
assert expectedKeys[0].equals(actualKeys[0]) : "actual=" + actualKeys[0] + " vs expected = " + expectedKeys[0];
214+
assert expectedKeys[1].equals(actualKeys[1]) : "actual=" + actualKeys[1] + " vs expected = " + expectedKeys[1];
215+
} finally {
216+
Releasables.close(expectedKeys);
217+
}
218+
return true;
219+
}
220+
136221
private BytesRefBlock buildTsidBlockWithOrdinal() {
137222
try (IntVector.FixedBuilder ordinalBuilder = blockFactory.newIntVectorFixedBuilder(positionCount())) {
138223
for (int i = 0; i < tsidArray.count; i++) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public BytesRefVector asVector() {
78+
public OrdinalBytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17+
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1718
import org.elasticsearch.compute.data.Block;
1819
import org.elasticsearch.compute.data.ElementType;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -30,6 +31,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3031

3132
public record Factory(
3233
Rounding.Prepared timeBucket,
34+
boolean sortedInput,
3335
List<BlockHash.GroupSpec> groups,
3436
AggregatorMode aggregatorMode,
3537
List<GroupingAggregator.Factory> aggregators,
@@ -38,17 +40,18 @@ public record Factory(
3840
@Override
3941
public Operator get(DriverContext driverContext) {
4042
// TODO: use TimeSeriesBlockHash when possible
41-
return new TimeSeriesAggregationOperator(
42-
timeBucket,
43-
aggregators,
44-
() -> BlockHash.build(
45-
groups,
46-
driverContext.blockFactory(),
47-
maxPageSize,
48-
true // we can enable optimizations as the inputs are vectors
49-
),
50-
driverContext
51-
);
43+
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44+
if (sortedInput && groups.size() == 2) {
45+
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46+
} else {
47+
return BlockHash.build(
48+
groups,
49+
driverContext.blockFactory(),
50+
maxPageSize,
51+
true // we can enable optimizations as the inputs are vectors
52+
);
53+
}
54+
}, driverContext);
5255
}
5356

5457
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.EvalOperator;
1919
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2020
import org.elasticsearch.compute.operator.Operator;
21-
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2221
import org.elasticsearch.index.analysis.AnalysisRegistry;
2322
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2423
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,12 +174,12 @@ else if (aggregatorMode.isOutputPartial()) {
175174
);
176175
// time-series aggregation
177176
if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
178-
operatorFactory = new TimeSeriesAggregationOperator.Factory(
179-
ts.timeBucketRounding(context.foldCtx()),
180-
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
177+
operatorFactory = timeSeriesAggregatorOperatorFactor(
178+
ts,
181179
aggregatorMode,
182180
aggregatorFactories,
183-
context.pageSize(aggregateExec.estimatedRowSize())
181+
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182+
context
184183
);
185184
// ordinal grouping
186185
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -379,4 +378,12 @@ public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory(
379378
ElementType groupType,
380379
LocalExecutionPlannerContext context
381380
);
381+
382+
public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
383+
TimeSeriesAggregateExec ts,
384+
AggregatorMode aggregatorMode,
385+
List<GroupingAggregator.Factory> aggregatorFactories,
386+
List<BlockHash.GroupSpec> groupSpecs,
387+
LocalExecutionPlannerContext context
388+
);
382389
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

+22
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.apache.lucene.search.IndexSearcher;
1515
import org.apache.lucene.search.Query;
1616
import org.elasticsearch.common.logging.HeaderWarning;
17+
import org.elasticsearch.compute.aggregation.AggregatorMode;
1718
import org.elasticsearch.compute.aggregation.GroupingAggregator;
19+
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1820
import org.elasticsearch.compute.data.Block;
1921
import org.elasticsearch.compute.data.ElementType;
2022
import org.elasticsearch.compute.lucene.DataPartitioning;
@@ -27,6 +29,7 @@
2729
import org.elasticsearch.compute.operator.Operator;
2830
import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
2931
import org.elasticsearch.compute.operator.SourceOperator;
32+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
3033
import org.elasticsearch.core.Nullable;
3134
import org.elasticsearch.index.IndexMode;
3235
import org.elasticsearch.index.IndexSettings;
@@ -61,6 +64,7 @@
6164
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6265
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
6366
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
67+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
6468
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6569
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
6670
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -299,6 +303,24 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
299303
);
300304
}
301305

306+
@Override
307+
public Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
308+
TimeSeriesAggregateExec ts,
309+
AggregatorMode aggregatorMode,
310+
List<GroupingAggregator.Factory> aggregatorFactories,
311+
List<BlockHash.GroupSpec> groupSpecs,
312+
LocalExecutionPlannerContext context
313+
) {
314+
return new TimeSeriesAggregationOperator.Factory(
315+
ts.timeBucketRounding(context.foldCtx()),
316+
shardContexts.size() == 1 && ts.anyMatch(p -> p instanceof TimeSeriesSourceExec),
317+
groupSpecs,
318+
aggregatorMode,
319+
aggregatorFactories,
320+
context.pageSize(ts.estimatedRowSize())
321+
);
322+
}
323+
302324
public static class DefaultShardContext implements ShardContext {
303325
private final int index;
304326
private final SearchExecutionContext ctx;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

+10
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ public static String[] planOriginalIndices(PhysicalPlan plan) {
157157
return indices.toArray(String[]::new);
158158
}
159159

160+
public static boolean requireTimeSeriesSource(PhysicalPlan plan) {
161+
return plan.anyMatch(p -> {
162+
if (p instanceof FragmentExec f) {
163+
return f.fragment().anyMatch(l -> l instanceof EsRelation s && s.indexMode() == IndexMode.TIME_SERIES);
164+
} else {
165+
return false;
166+
}
167+
});
168+
}
169+
160170
private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
161171
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
162172
if (r.indexMode() != IndexMode.LOOKUP) {

0 commit comments

Comments
 (0)