Skip to content

Shard level search stats - DfsPhase #126380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,12 @@ public void testSimpleStats() throws Exception {
// make sure that number of requests in progress is 0
assertThat(stats.getIndex("test1").getTotal().getIndexing().getTotal().getIndexCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getIndexing().getTotal().getDeleteCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getDfsCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getFetchCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getQueryCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getIndexing().getTotal().getIndexCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getIndexing().getTotal().getDeleteCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getDfsCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getFetchCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getQueryCurrent(), equalTo(0L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
static final String STANDARD_INDEX_COUNT = "es.indices.standard.total";
static final String STANDARD_BYTES_SIZE = "es.indices.standard.size";
static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total";
static final String STANDARD_DFS_COUNT = "es.indices.standard.dfs.total";
static final String STANDARD_DFS_TIME = "es.indices.standard.dfs.time";
static final String STANDARD_DFS_FAILURE = "es.indices.standard.dfs.failure.total";
static final String STANDARD_QUERY_COUNT = "es.indices.standard.query.total";
static final String STANDARD_QUERY_TIME = "es.indices.standard.query.time";
static final String STANDARD_QUERY_FAILURE = "es.indices.standard.query.failure.total";
Expand Down Expand Up @@ -245,6 +248,10 @@ public void testIndicesMetrics() {
telemetry,
1,
Map.of(
STANDARD_DFS_COUNT,
equalTo(search1.getDfsCount()),
STANDARD_DFS_TIME,
equalTo(search1.getDfsTimeInMillis()),
STANDARD_QUERY_COUNT,
equalTo(numStandardIndices),
STANDARD_QUERY_TIME,
Expand Down Expand Up @@ -334,6 +341,8 @@ public void testIndicesMetrics() {
telemetry,
4,
Map.of(
STANDARD_DFS_FAILURE,
equalTo(0L),
STANDARD_QUERY_FAILURE,
equalTo(0L),
STANDARD_FETCH_FAILURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public void testSimpleStats() throws Exception {

IndicesStatsResponse indicesStats = indicesAdmin().prepareStats().get();
logger.debug("###### indices search stats: {}", indicesStats.getTotal().getSearch());
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsCount(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsTimeInMillis(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryCount(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryTimeInMillis(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getFetchCount(), greaterThan(0L));
Expand All @@ -138,6 +140,8 @@ public void testSimpleStats() throws Exception {

indicesStats = indicesAdmin().prepareStats().setGroups("group1").get();
assertThat(indicesStats.getTotal().getSearch().getGroupStats(), notNullValue());
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getDfsCount(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getDfsTimeInMillis(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getQueryCount(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getQueryTimeInMillis(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchCount(), greaterThan(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);
public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00);
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
public static final TransportVersion DFS_STATS = def(9_048_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class SearchStats implements Writeable, ToXContentFragment {

public static class Stats implements Writeable, ToXContentFragment {

private long dfsCount;
private long dfsTimeInMillis;
private long dfsCurrent;

private long queryCount;
private long queryTimeInMillis;
private long queryCurrent;
Expand All @@ -46,6 +50,7 @@ public static class Stats implements Writeable, ToXContentFragment {
private long suggestTimeInMillis;
private long suggestCurrent;

private long dfsFailure;
private long queryFailure;
private long fetchFailure;

Expand All @@ -54,6 +59,10 @@ private Stats() {
}

public Stats(
long dfsCount,
long dfsTimeInMillis,
long dfsCurrent,
long dfsFailure,
long queryCount,
long queryTimeInMillis,
long queryCurrent,
Expand All @@ -69,6 +78,11 @@ public Stats(
long suggestTimeInMillis,
long suggestCurrent
) {
this.dfsCount = dfsCount;
this.dfsTimeInMillis = dfsTimeInMillis;
this.dfsCurrent = dfsCurrent;
this.dfsFailure = dfsFailure;

this.queryCount = queryCount;
this.queryTimeInMillis = queryTimeInMillis;
this.queryCurrent = queryCurrent;
Expand All @@ -89,6 +103,13 @@ public Stats(
}

private Stats(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.DFS_STATS)) {
dfsCount = in.readVLong();
dfsTimeInMillis = in.readVLong();
dfsCurrent = in.readVLong();
dfsFailure = in.readVLong();
}

queryCount = in.readVLong();
queryTimeInMillis = in.readVLong();
queryCurrent = in.readVLong();
Expand All @@ -113,6 +134,13 @@ private Stats(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.DFS_STATS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity - do we ever clean up these checks once we're far enough into the future to safely assume all nodes are upgraded past this version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Ben, for the review. Yes, there is a cleanup process; please check the following pr

out.writeVLong(dfsCount);
out.writeVLong(dfsTimeInMillis);
out.writeVLong(dfsCurrent);
out.writeVLong(dfsFailure);
}

out.writeVLong(queryCount);
out.writeVLong(queryTimeInMillis);
out.writeVLong(queryCurrent);
Expand All @@ -136,6 +164,11 @@ public void writeTo(StreamOutput out) throws IOException {
}

public void add(Stats stats) {
dfsCount += stats.dfsCount;
dfsTimeInMillis += stats.dfsTimeInMillis;
dfsCurrent += stats.dfsCurrent;
dfsFailure += stats.dfsFailure;

queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;
queryCurrent += stats.queryCurrent;
Expand All @@ -156,6 +189,10 @@ public void add(Stats stats) {
}

public void addForClosingShard(Stats stats) {
dfsCount += stats.dfsCount;
dfsTimeInMillis += stats.dfsTimeInMillis;
dfsFailure += stats.dfsFailure;

queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;
queryFailure += stats.queryFailure;
Expand All @@ -173,6 +210,26 @@ public void addForClosingShard(Stats stats) {
suggestTimeInMillis += stats.suggestTimeInMillis;
}

public long getDfsCount() {
return dfsCount;
}

public TimeValue getDfsTime() {
return new TimeValue(dfsTimeInMillis);
}

public long getDfsTimeInMillis() {
return dfsTimeInMillis;
}

public long getDfsCurrent() {
return dfsCurrent;
}

public long getDfsFailure() {
return dfsFailure;
}

public long getQueryCount() {
return queryCount;
}
Expand Down Expand Up @@ -251,6 +308,11 @@ public static Stats readStats(StreamInput in) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.DFS_TOTAL, dfsCount);
builder.humanReadableField(Fields.DFS_TIME_IN_MILLIS, Fields.DFS_TIME, getDfsTime());
builder.field(Fields.DFS_CURRENT, dfsCurrent);
builder.field(Fields.DFS_FAILURE, dfsFailure);

builder.field(Fields.QUERY_TOTAL, queryCount);
builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime());
builder.field(Fields.QUERY_CURRENT, queryCurrent);
Expand All @@ -277,7 +339,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Stats that = (Stats) o;
return queryCount == that.queryCount
return dfsCount == that.dfsCount
&& dfsTimeInMillis == that.dfsTimeInMillis
&& dfsCurrent == that.dfsCurrent
&& dfsFailure == that.dfsFailure
&& queryCount == that.queryCount
&& queryTimeInMillis == that.queryTimeInMillis
&& queryCurrent == that.queryCurrent
&& queryFailure == that.queryFailure
Expand All @@ -296,6 +362,10 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
dfsCount,
dfsTimeInMillis,
dfsCurrent,
dfsFailure,
queryCount,
queryTimeInMillis,
queryCurrent,
Expand Down Expand Up @@ -409,6 +479,11 @@ static final class Fields {
static final String SEARCH = "search";
static final String OPEN_CONTEXTS = "open_contexts";
static final String GROUPS = "groups";
static final String DFS_TOTAL = "dfs_total";
static final String DFS_TIME = "dfs_time";
static final String DFS_TIME_IN_MILLIS = "dfs_time_in_millis";
static final String DFS_CURRENT = "dfs_current";
static final String DFS_FAILURE = "dfs_failure";
static final String QUERY_TOTAL = "query_total";
static final String QUERY_TIME = "query_time";
static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,27 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
});
}

@Override
public void onPreDfsPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.dfsCurrent.inc());
}

@Override
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
statsHolder.dfsMetric.inc(tookInNanos);
statsHolder.dfsCurrent.dec();
});
}

@Override
public void onFailedDfsPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> {
statsHolder.dfsCurrent.dec();
statsHolder.dfsFailure.inc();
});
}

private void computeStats(SearchContext searchContext, Consumer<StatsHolder> consumer) {
consumer.accept(totalStats);
var groupStats = searchContext.groupStats();
Expand Down Expand Up @@ -154,6 +175,7 @@ public void onFreeScrollContext(ReaderContext readerContext) {
}

static final class StatsHolder {
final MeanMetric dfsMetric = new MeanMetric();
final MeanMetric queryMetric = new MeanMetric();
final MeanMetric fetchMetric = new MeanMetric();
/* We store scroll statistics in microseconds because with nanoseconds we run the risk of overflowing the total stats if there are
Expand All @@ -165,16 +187,22 @@ static final class StatsHolder {
*/
final MeanMetric scrollMetric = new MeanMetric();
final MeanMetric suggestMetric = new MeanMetric();
final CounterMetric dfsCurrent = new CounterMetric();
final CounterMetric queryCurrent = new CounterMetric();
final CounterMetric fetchCurrent = new CounterMetric();
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();

final CounterMetric dfsFailure = new CounterMetric();
final CounterMetric queryFailure = new CounterMetric();
final CounterMetric fetchFailure = new CounterMetric();

SearchStats.Stats stats() {
return new SearchStats.Stats(
dfsMetric.count(),
TimeUnit.NANOSECONDS.toMillis(dfsMetric.sum()),
dfsCurrent.count(),
dfsFailure.count(),
queryMetric.count(),
TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()),
queryCurrent.count(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed before the DFS phase is executed
* @param searchContext the current search context
*/
default void onPreDfsPhase(SearchContext searchContext) {}

/**
* Executed after the query DFS successfully finished.
* Note: this is not invoked if the DFS phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*
* @see #onFailedQueryPhase(SearchContext)
*/
default void onDfsPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed if a dfs phased failed.
* @param searchContext the current search context
*/
default void onFailedDfsPhase(SearchContext searchContext) {}

/**
* Executed when a new reader context was created
* @param readerContext the created context
Expand Down Expand Up @@ -182,6 +204,39 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
}
}

@Override
public void onPreDfsPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreDfsPhase(searchContext);
} catch (Exception e) {
logger.warn(() -> "onPreDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onFailedDfsPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedDfsPhase(searchContext);
} catch (Exception e) {
logger.warn(() -> "onFailedDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onDfsPhase(searchContext, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onNewReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Loading