Skip to content

Add allocation write load stats to write thread pool #130373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

DiannaHohensee
Copy link
Contributor

@DiannaHohensee DiannaHohensee commented Jul 1, 2025

Instrument the WRITE thread pool to collect:

  1. pool utilization EWMA
  2. queue latency EWMA

Relates ES-12233


See the ES ticket for details. I would like to collect some agreement on the approach. For example, taking an EWMA of the queue latencies; and taking the EWMA of the thread utilization metric, which is sampled periodically by the APM system. This approach is notably attempting to be 'good enough' for a first implementation of the write load balancing.

I still need to work out the thread pool utilization EWMA's alpha seed. I believe it must be tied to the APM period in order to be sensible. Unfortunately the APM collection frequency is configurable as well. Unless I make a separate and parallel utilization sampler, we're stuck with a dependency on APM's frequency. Anyway, still working on that bit.

@DiannaHohensee DiannaHohensee self-assigned this Jul 1, 2025
@DiannaHohensee DiannaHohensee added :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed Coordination Meta label for Distributed Coordination team and removed v9.2.0 labels Jul 1, 2025
Instrument the WRITE thread pool to collect:
1) pool utilization EWMA
2) queue latency EWMA

Relates ES-12233
@DiannaHohensee DiannaHohensee force-pushed the 2025/06/30/add-node-level-write-load-stats branch from 4a7987e to 4037ccf Compare July 1, 2025 04:43
Comment on lines 212 to 214
if (queueLatencyMillis > 0) {
queueLatencyMillisEWMA.addValue(queueLatencyMillis);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0 is meaningful for queue time and should be included?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I had an-if else statement here previously. Yes, definitely should add zeros.

Comment on lines 182 to 186
if (trackUtilizationEWMA) {
percentPoolUtilizationEWMA.addValue(utilizationSinceLastPoll);
// Test only tracking.
assert setUtilizationSinceLastPoll(utilizationSinceLastPoll);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels odd to tie utilization polling to APM. I think they can be separate. For example, we can have a separate set of lastTotalExecutionTime and lastPollTime which should allow us compute the utlization in a different frequency than the APM?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the decision originally was because it avoided the need for an extra thread etc. given that we're now extending it's use, I think it makes sense to rethink all of that.

it sounds like we want a more sophisticated average utilisation number.

If we're willing to wear the cost of a thread to do the polling, I think it makes sense for that thread to maintain THE average (which will become an EWMA) and publish that as the es.thread_pool.{name}.threads.utilization.current value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, I shall go ahead with that then, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note though, if we poll more often than once a minute the utilisation > 100% issues will get worse.

We could do what write load does and take into account running tasks, but it's all adding to the cost, and restricting this calculation to thread pools that have trackOngoingTasks=true. We should consider that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're willing to wear the cost of a thread to do the polling

We don't really need a "new" thread for it, right? It seems the options so far is either polling from AverageWriteLoadSampler orInternalClusterInfoService, both have existing thread schedule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AverageWriteLoadSampler is part of the auto-scaling reporting from data node to master, and runs every 1 second. The InternalClusterInfoService runs every 30 seconds (INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING).

An EWMA based on samples every 30 seconds would likely be unusable.

I started working on a Runnable that gets rescheduled on the GENERIC thread pool every 3 seconds, say.

One thing to note though, if we poll more often than once a minute the utilisation > 100% issues will get worse.

We could cap the utilization at 100%?

Copy link
Contributor

@nicktindall nicktindall Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could cap the utilization at 100%?

I think that would just leave us with wildly inaccurate utilisation numbers. I think 60s is a reasonable interval for polling this stuff. I don't think the > 100% utilisation is really that significant when you look at the thread utilisation dashboards. It seems to render utilisation fairly accurately overall. We could experiment with 30s and see how that looks.

Could we just not use the ewma and instead use the average for the last 60s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see a histogram of the run time for tasks. It's hard to know what a reasonable interval is without seeing that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An EWMA based on samples every 30 seconds would likely be unusable.

I think we should start out simple by letting InternalClusterInfoService to poll on 30s interval without adding extra thread scheduling. We can add that if it proves to be a problem. Personally I think every 30s sounds fine for balancing purpose.

Copy link
Contributor

@nicktindall nicktindall Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why not try just using the average we have? I think ewma is adding potentially unnecessary additional logic. It seems more useful when you are smoothing finer grained samples, if you're already looking at a 30s window I don't think it's necessary.

Comment on lines 150 to 162
public double getPercentPoolUtilizationEWMA() {
if (trackUtilizationEWMA == false) {
return 0;
}
return this.percentPoolUtilizationEWMA.getAverage();
}

public double getQueuedTaskLatencyMillisEWMA() {
if (trackQueueLatencyEWMA == false) {
return 0;
}
return queueLatencyMillisEWMA.getAverage();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some service needs to call these methods. Is it going to be AverageWriteLoadSampler or is it still in discussion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still being discussed -- in slack coordination channel and latest sync meeting.

Latest idea was a new Transport action to have the master pull the data -- there's a node stats action that the ClusterInfoService uses, but that's a public API and we likely want the leniency to adjust the node-write load estimates as we develop the feature. Though I have some concerns about missing queue latency EWMA spikes if we collect that EWMA every 10 or 30 seconds via ClusterInfoService. EWMA can change pretty rapidly when sample based (a sample per write thread pool task) rather than time based. I need to think on that a little more. UPDATE: though if we periodically poll the front task of the queue, as Nick mentioned, maybe we avoid the EWMA problem here.

Comment on lines +220 to +228
/**
* If the queue latency reaches a high value (e.g. 10-30 seconds), then this thread pool is overwhelmed. It may be temporary, but that
* spike warrants the allocation balancer adjusting some number of shards, if possible. Therefore, it is alright to react quickly.
*
* As an example, suppose the EWMA is 10_000ms, i.e. 10 seconds.
* A single task in the queue that takes 30_000ms, i.e. 30 seconds, would result in a new EWMA of ~12_000ms
* 0.1 x 30_000ms + 0.9 x 10_000 = 3_000ms + 9_000ms = 12_000ms
*/
public static final double DEFAULT_WRITE_THREAD_POOL_QUEUE_LATENCY_EWMA_ALPHA = 0.1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuitively, I feel this value could use some relationship with the one for executionEWMA since they are updated with the same fequency before and after the task execution, respectively. If the node has N write threads and we observe N large execution times in a row, it feels roughly a similar effect as seeing a single large queue latency, i.e. all threads were busy executing tasks that took long to complete. This makes me think whether this value should be N * executionEWMA_alpha. I could be be wrong on this. Just an idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling a bit with the ewma used in this way. We get a value weighted towards the last N values, those could have arrived in the last 0.5ms or the last 30seconds, so it seems difficult to reason about. I wonder if it'd be easier to reason about if the queue were polled at a fixed interval (by the same regular thread that we introduce to poll the ulitlization) and ask the task at the front of the queue how long it'd been waiting?

That way I think the utilisation and latency would reflect observations over the same fixed time period and we could tweak the alpha values to find a period that was meaningful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it means the average doesn't move if there's no tasks moving through the queue? I know its unlikely, but it seems wrong that the average would just linger at whatever it was most recently until a task was executed. The latency should return to zero when there's nothing happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it'd be easier to reason about if the queue were polled at a fixed interval (by the same regular thread that we introduce to poll the ulitlization) and ask the task at the front of the queue how long it'd been waiting?

Our ThreadPools use a LinkedTransferQueue, which has peek(), and the task is TimedRunnable that has a creationTimeNanos private field. So I could grab a work queue reference in TaskExecutionTimeTrackingEsThreadPoolExecutor and add a TimedRunnable#getCreationTimeNanos method. So that is an option.

I had thought the EWMA would be better than periodically checking, as an average, but it does have some limitations. The argument against periodic poling is that we might see a momentary spike, and not know it, or vice versa miss frequent spikes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pooya was explaining that the executionTimeEWMA is okay to be disconnected from time by the way that auto-scaling uses the value. The idea being that when writes do occur, the executionTimeEWMA reflects the average time needed to serve the requests. Even if there are lulls where no write requests occur, the idea is that when the writes do occur again, the executionTimeEWMA accurately reflects the work requirements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought the EWMA would be better than periodically checking, as an average, but it does have some limitations. The argument against periodic poling is that we might see a momentary spike, and not know it, or vice versa miss frequent spikes.

Though, if we are going to create a polling system for thread pool utilization on the data node, per comment, then maybe we also have the opportunity to create polling for the queue latency with a time stable period EWMA. Then both EWMAs will be relatable to time, instead of the number of tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the node has N write threads and we observe N large execution times in a row, it feels roughly a similar effect as seeing a single large queue latency, i.e. all threads were busy executing tasks that took long to complete. This makes me think whether this value should be N * executionEWMA_alpha.

To make things a little more concrete, N * executionEWMA_alpha would be, say, 8 * .02 = 0.16.

I was hoping to make hot-spot indicators / node-level write load relatable to our shard-level write loads, so that the balancer could see "node A has X too much load, let's move X shard load off of node A". Shard load is relatable to execution time, and queue time, which is a lot more similar to auto-scaling calculations (I expect, haven't verified).

It's possible that we come up with a mathematical relationship between queue latency and needed execution resources do in future, but lacking it right now, the alpha seed simply controls how fast we want the average to respond to new data points. The number of write threads on a node seems relevant if relocating work to nodes with different thread pool sizes, or sizing up like auto-scaling does; not sure otherwise.

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it means the average doesn't move if there's no tasks moving through the queue? I know its unlikely, but it seems wrong that the average would just linger at whatever it was most recently until a task was executed. The latency should return to zero when there's nothing happening.

A workaround would be for the balancer to only consider the queue duration when the thread pool utilization is greater than the 90% threshold. That would guarantee that both the thread pool is in use AND the queue latency is bad. An active thread pool with no queuing would quickly bring the EWMA back down again.

After the other discussions, I think we have three options for queue latency

  1. the master node polls every 30 seconds to see what the current front of the write queue latency is.
  2. continue taking the EWMA of all queue latencies (including zeros)
  3. create a periodic sampler for queue latency to add to an EWMA.

I'm inclined to do (1) as the simplest. I'd implement what I outline above,

Our ThreadPools use a LinkedTransferQueue, which has peek(), and the task is TimedRunnable that has a creationTimeNanos private field. So I could grab a work queue reference in TaskExecutionTimeTrackingEsThreadPoolExecutor and add a TimedRunnable#getCreationTimeNanos method.

Actually, the simplest would be to use what I already have, which is (2) 🤷‍♀️

Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not a fan of the EWMA for this, because the rate that the value goes down is dependant on the amount of tasks moving through the queue, so if our average was 30s, then we have 5 minutes with no tasks at all (and no queue), it'll still be 30s 5 minutes later despite there being no queue, and until we run enough smaller values through it to bring the average down.

I don't feel strongly enough about it to block the change for it, because I think this scenario is probably unusual, but I have my reservations about it.

If we polled the queue regularly I would be happier with the EWMA as it'll decay when the queue is quiet.

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the feedback! Much appreciated.

I've extended the ThreadPool#pollUtilization method to report either for APM or Allocation. In future, the ClusterInfoService will poll every 30 seconds to collect the average utilization since the last call / over the last 30 seconds.

As for queue latency. I summarized in one of the threads, but these look like the options we've discussed:

  1. the master node polls every 30 seconds to fetch the latency of the current head of the write queue. No averages, just point in time.
  2. continue taking the EWMA of all queue latencies (including zeros) as tasks complete
  3. create a periodic sampler for queue latency to add to an EWMA.

Nick raised thread pool inactivity leaving the queue latency EWMA high. That can be countered by ignoring queue latency when utilization is less than 90% (or whatever threshold value we set): the queue EWMA will go back down when the thread pool is active again.

I don't think (3) is a contender right now. That leaves (1) or (2). (2) is currently implemented. (2) will average out the result over some number of samples, which could help show spiky queue latencies. On the other hand, if the node is decidedly hot-spotting, the queue latency is unlikely to go down.

Let me know what you think, or if I've missed anything.

@DiannaHohensee DiannaHohensee requested a review from ywangd July 2, 2025 21:41
@DiannaHohensee DiannaHohensee marked this pull request as ready for review July 2, 2025 21:54
@DiannaHohensee DiannaHohensee requested a review from a team as a code owner July 2, 2025 21:54
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sub-classed pollUtilization(), and removed *EWMA from interfaces in favor of Average.

false,
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST,
DEFAULT_QUEUE_LATENCY_EWMA_ALPHA_FOR_TEST
);
Copy link
Contributor

@nicktindall nicktindall Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think when you get to three consecutive booleans, it's worth making a builder for readability, e.g.

TaskTrackingConfig config = TaskTrackingConfig.builder()
    .trackingOngoingTasks()
    .trackingQueueLatencyAverage()
    // ... etc
    .build();

A side benefit would be you could default all to false and just specify the ones that were true.

Happy to be challenged on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide this is a good idea, also happy for it to be a separate PR because it'll add a fair bit of noise I imagine

private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
private final boolean trackQueueLatencyAverage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what about trackAverageQueueLatency (this reads better IMO)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed Coordination Meta label for Distributed Coordination team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants