Skip to content

Commit 690d6ce

Browse files
authored
add topk/bottomk that aggregates others (Netflix#1248)
Fixes Netflix#1224. Adds variants of topk/bottomk that in addition to computing the highest priority values will also return an aggregate time series that includes all of the other time series that were not high enough priority. This can be useful to see what proportion of the overall volume is represented by the highest priority time series. The operators have the same signature as `:topk` and `:bottomk`, but the aggregation to use is specified in the operator name. There are four aggregates supported: - min - max - sum - avg Count was not included as an aggregate because it could be ambiguous and does not seem useful. As part of this work there was some refactoring of the existing aggregation operations to make the count case more consistent.
1 parent 6078f58 commit 690d6ce

File tree

19 files changed

+359
-122
lines changed

19 files changed

+359
-122
lines changed

atlas-core/src/main/scala/com/netflix/atlas/core/model/DataExpr.scala

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -126,29 +126,28 @@ object DataExpr {
126126
}
127127
}
128128

129-
sealed trait AggregateFunction extends DataExpr with BinaryOp {
129+
sealed trait AggregateFunction extends DataExpr {
130130

131131
def labelString: String
132132

133133
def withConsolidation(f: ConsolidationFunction): AggregateFunction
134134

135+
def aggregator(start: Long, end: Long): TimeSeries.Aggregator
136+
135137
override def eval(context: EvalContext, data: List[TimeSeries]): ResultSet = {
136138
val filtered = data.filter(t => query.matches(t.tags))
137139
val aggr =
138140
if (filtered.isEmpty) TimeSeries.noData(query, context.step)
139141
else {
140142
val tags = commonTags(filtered.head.tags)
141-
val t = TimeSeries.aggregate(filtered.iterator, context.start, context.end, this)
143+
val aggr = aggregator(context.start, context.end)
144+
filtered.foreach(aggr.update)
145+
val t = aggr.result()
142146
TimeSeries(tags, TimeSeries.toLabel(tags), t.data)
143147
}
144148
val rs = consolidate(context.step, List(aggr))
145149
ResultSet(this, rs, context.state)
146150
}
147-
148-
// Make sure we get toString locally rather than from BinaryOp
149-
override def toString: String = {
150-
if (offset.isZero) exprString else s"$exprString,$offset,:offset"
151-
}
152151
}
153152

154153
case class Sum(
@@ -164,13 +163,15 @@ object DataExpr {
164163

165164
override def withOffset(d: Duration): Sum = copy(offset = d)
166165

167-
def labelString: String = s"sum(${query.labelString})"
166+
override def labelString: String = s"sum(${query.labelString})"
168167

169168
override def exprString: String = {
170169
if (cf == ConsolidationFunction.Avg) s"$query,:sum" else s"$query,:sum,$cf"
171170
}
172171

173-
def apply(v1: Double, v2: Double): Double = Math.addNaN(v1, v2)
172+
override def aggregator(start: Long, end: Long): TimeSeries.Aggregator = {
173+
new TimeSeries.SimpleAggregator(start, end, Math.addNaN)
174+
}
174175
}
175176

176177
case class Count(
@@ -179,35 +180,22 @@ object DataExpr {
179180
offset: Duration = Duration.ZERO
180181
) extends AggregateFunction {
181182

182-
override def eval(context: EvalContext, data: List[TimeSeries]): ResultSet = {
183-
val filtered = data.filter(t => query.matches(t.tags)).map { t =>
184-
TimeSeries(t.tags, t.label, t.data.mapValues(v => if (v.isNaN) Double.NaN else 1.0))
185-
}
186-
val aggr =
187-
if (filtered.isEmpty) TimeSeries.noData(query, context.step)
188-
else {
189-
val tags = commonTags(filtered.head.tags)
190-
val t = TimeSeries.aggregate(filtered.iterator, context.start, context.end, this)
191-
TimeSeries(tags, TimeSeries.toLabel(tags), t.data)
192-
}
193-
val rs = consolidate(context.step, List(aggr))
194-
ResultSet(this, rs, context.state)
195-
}
196-
197183
override def withConsolidation(f: ConsolidationFunction): AggregateFunction = f match {
198184
case v: SumOrAvgCf => copy(cf = v)
199185
case v => Consolidation(copy(query = query, offset = offset), v)
200186
}
201187

202188
override def withOffset(d: Duration): Count = copy(offset = d)
203189

204-
def labelString: String = s"count(${query.labelString})"
190+
override def labelString: String = s"count(${query.labelString})"
205191

206192
override def exprString: String = {
207193
if (cf == ConsolidationFunction.Avg) s"$query,:count" else s"$query,:count,$cf"
208194
}
209195

210-
def apply(v1: Double, v2: Double): Double = Math.addNaN(v1, v2)
196+
override def aggregator(start: Long, end: Long): TimeSeries.Aggregator = {
197+
new TimeSeries.CountAggregator(start, end)
198+
}
211199
}
212200

213201
case class Min(query: Query, offset: Duration = Duration.ZERO) extends AggregateFunction {
@@ -220,11 +208,13 @@ object DataExpr {
220208

221209
override def withOffset(d: Duration): Min = copy(offset = d)
222210

223-
def labelString: String = s"min(${query.labelString})"
211+
override def labelString: String = s"min(${query.labelString})"
224212

225213
override def exprString: String = s"$query,:min"
226214

227-
def apply(v1: Double, v2: Double): Double = Math.minNaN(v1, v2)
215+
override def aggregator(start: Long, end: Long): TimeSeries.Aggregator = {
216+
new TimeSeries.SimpleAggregator(start, end, Math.minNaN)
217+
}
228218
}
229219

230220
case class Max(query: Query, offset: Duration = Duration.ZERO) extends AggregateFunction {
@@ -237,11 +227,13 @@ object DataExpr {
237227

238228
override def withOffset(d: Duration): Max = copy(offset = d)
239229

240-
def labelString: String = s"max(${query.labelString})"
230+
override def labelString: String = s"max(${query.labelString})"
241231

242232
override def exprString: String = s"$query,:max"
243233

244-
def apply(v1: Double, v2: Double): Double = Math.maxNaN(v1, v2)
234+
override def aggregator(start: Long, end: Long): TimeSeries.Aggregator = {
235+
new TimeSeries.SimpleAggregator(start, end, Math.maxNaN)
236+
}
245237
}
246238

247239
case class Consolidation(af: AggregateFunction, cf: ConsolidationFunction)
@@ -263,11 +255,13 @@ object DataExpr {
263255
Consolidation(af.withOffset(d).asInstanceOf[AggregateFunction], cf)
264256
}
265257

266-
def labelString: String = af.labelString
258+
override def labelString: String = af.labelString
267259

268260
override def exprString: String = s"$af,$cf"
269261

270-
def apply(v1: Double, v2: Double): Double = af(v1, v2)
262+
override def aggregator(start: Long, end: Long): TimeSeries.Aggregator = {
263+
af.aggregator(start, end)
264+
}
271265
}
272266

273267
case class GroupBy(af: AggregateFunction, keys: List[String]) extends DataExpr {

atlas-core/src/main/scala/com/netflix/atlas/core/model/FilterExpr.scala

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ object FilterExpr {
149149
/** Comparator that determines the priority order. */
150150
def comparator: Comparator[TimeSeriesSummary]
151151

152+
/** Aggregation to use for time series that are not one of the K highest priority. */
153+
def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
154+
TimeSeries.NoopAggregator
155+
}
156+
152157
/** Grouped expression to select the input from. */
153158
def expr: TimeSeriesExpr
154159

@@ -172,12 +177,23 @@ object FilterExpr {
172177

173178
def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
174179
val buffer = new BoundedPriorityBuffer[TimeSeriesSummary](k, comparator)
180+
val aggregator = othersAggregator(context.start, context.end)
175181
val rs = expr.eval(context, data)
176182
rs.data.foreach { t =>
177183
val v = SummaryStats(t, context.start, context.end).get(stat)
178-
buffer.add(TimeSeriesSummary(t, v))
184+
val other = buffer.add(TimeSeriesSummary(t, v))
185+
if (other != null) {
186+
aggregator.update(other.timeSeries)
187+
}
188+
}
189+
val newData = aggregator match {
190+
case aggr if aggr.isEmpty =>
191+
buffer.toList.map(_.timeSeries)
192+
case _ =>
193+
val others = aggregator.result()
194+
val otherTags = others.tags ++ finalGrouping.map(k => k -> "--others--").toMap
195+
others.withTags(otherTags) :: buffer.toList.map(_.timeSeries)
179196
}
180-
val newData = buffer.toList.map(_.timeSeries)
181197
ResultSet(this, newData, rs.state)
182198
}
183199
}
@@ -188,12 +204,96 @@ object FilterExpr {
188204
override def comparator: Comparator[TimeSeriesSummary] = StatComparator.reversed()
189205
}
190206

207+
case class TopKOthersMin(expr: TimeSeriesExpr, stat: String, k: Int) extends PriorityFilterExpr {
208+
override def opName: String = "topk-others-min"
209+
210+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator.reversed()
211+
212+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
213+
new TimeSeries.SimpleAggregator(start, end, Math.minNaN)
214+
}
215+
}
216+
217+
case class TopKOthersMax(expr: TimeSeriesExpr, stat: String, k: Int) extends PriorityFilterExpr {
218+
override def opName: String = "topk-others-max"
219+
220+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator.reversed()
221+
222+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
223+
new TimeSeries.SimpleAggregator(start, end, Math.maxNaN)
224+
}
225+
}
226+
227+
case class TopKOthersSum(expr: TimeSeriesExpr, stat: String, k: Int) extends PriorityFilterExpr {
228+
override def opName: String = "topk-others-sum"
229+
230+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator.reversed()
231+
232+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
233+
new TimeSeries.SimpleAggregator(start, end, Math.addNaN)
234+
}
235+
}
236+
237+
case class TopKOthersAvg(expr: TimeSeriesExpr, stat: String, k: Int) extends PriorityFilterExpr {
238+
override def opName: String = "topk-others-avg"
239+
240+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator.reversed()
241+
242+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
243+
new TimeSeries.AvgAggregator(start, end)
244+
}
245+
}
246+
191247
case class BottomK(expr: TimeSeriesExpr, stat: String, k: Int) extends PriorityFilterExpr {
192248
override def opName: String = "bottomk"
193249

194250
override def comparator: Comparator[TimeSeriesSummary] = StatComparator
195251
}
196252

253+
case class BottomKOthersMin(expr: TimeSeriesExpr, stat: String, k: Int)
254+
extends PriorityFilterExpr {
255+
override def opName: String = "bottomk-others-min"
256+
257+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator
258+
259+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
260+
new TimeSeries.SimpleAggregator(start, end, Math.minNaN)
261+
}
262+
}
263+
264+
case class BottomKOthersMax(expr: TimeSeriesExpr, stat: String, k: Int)
265+
extends PriorityFilterExpr {
266+
override def opName: String = "bottomk-others-max"
267+
268+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator
269+
270+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
271+
new TimeSeries.SimpleAggregator(start, end, Math.maxNaN)
272+
}
273+
}
274+
275+
case class BottomKOthersSum(expr: TimeSeriesExpr, stat: String, k: Int)
276+
extends PriorityFilterExpr {
277+
override def opName: String = "bottomk-others-sum"
278+
279+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator
280+
281+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
282+
new TimeSeries.SimpleAggregator(start, end, Math.addNaN)
283+
}
284+
}
285+
286+
case class BottomKOthersAvg(expr: TimeSeriesExpr, stat: String, k: Int)
287+
extends PriorityFilterExpr {
288+
override def opName: String = "bottomk-others-avg"
289+
290+
override def comparator: Comparator[TimeSeriesSummary] = StatComparator
291+
292+
override def othersAggregator(start: Long, end: Long): TimeSeries.Aggregator = {
293+
new TimeSeries.AvgAggregator(start, end)
294+
}
295+
}
296+
197297
/**
198298
* Caches the statistic value associated with a time series. Used for priority filters to
199299
* avoid recomputing the summary statistics on each comparison operation.

atlas-core/src/main/scala/com/netflix/atlas/core/model/FilterVocabulary.scala

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,16 @@ object FilterVocabulary extends Vocabulary {
4242
Macro("stat-max-mf", List("max", ":stat"), List("42")),
4343
Macro("stat-avg-mf", List("avg", ":stat"), List("42")),
4444
// Priority operators: https://github.com/Netflix/atlas/issues/1224
45-
BottomK,
46-
TopK
45+
PriorityK("bottomk", FilterExpr.BottomK.apply),
46+
PriorityK("bottomk-others-min", FilterExpr.BottomKOthersMin.apply),
47+
PriorityK("bottomk-others-max", FilterExpr.BottomKOthersMax.apply),
48+
PriorityK("bottomk-others-sum", FilterExpr.BottomKOthersSum.apply),
49+
PriorityK("bottomk-others-avg", FilterExpr.BottomKOthersAvg.apply),
50+
PriorityK("topk", FilterExpr.TopK.apply),
51+
PriorityK("topk-others-min", FilterExpr.TopKOthersMin.apply),
52+
PriorityK("topk-others-max", FilterExpr.TopKOthersMax.apply),
53+
PriorityK("topk-others-sum", FilterExpr.TopKOthersSum.apply),
54+
PriorityK("topk-others-avg", FilterExpr.TopKOthersAvg.apply)
4755
)
4856

4957
case object Stat extends SimpleWord {
@@ -228,37 +236,8 @@ object FilterVocabulary extends Vocabulary {
228236
List("name,sps,:eq,:sum,(,nf.cluster,),:by,:stat-max,30e3,:gt")
229237
}
230238

231-
case object TopK extends SimpleWord {
232-
233-
override def name: String = "topk"
234-
235-
protected def matcher: PartialFunction[List[Any], Boolean] = {
236-
case IntType(_) :: (_: String) :: TimeSeriesType(_) :: _ => true
237-
case IntType(_) :: (_: String) :: (_: StyleExpr) :: _ => true
238-
}
239-
240-
protected def executor: PartialFunction[List[Any], List[Any]] = {
241-
case IntType(k) :: (s: String) :: TimeSeriesType(t) :: stack =>
242-
FilterExpr.TopK(t, s, k) :: stack
243-
case IntType(k) :: (s: String) :: (t: StyleExpr) :: stack =>
244-
t.copy(expr = FilterExpr.TopK(t.expr, s, k)) :: stack
245-
}
246-
247-
override def signature: String = "TimeSeriesExpr stat:String k:Int -- FilterExpr"
248-
249-
override def summary: String =
250-
"""
251-
|Limit the output to the `K` time series with the highest values for the specified
252-
|summary statistic.
253-
""".stripMargin.trim
254-
255-
override def examples: List[String] =
256-
List("name,sps,:eq,:sum,(,nf.cluster,),:by,max,5")
257-
}
258-
259-
case object BottomK extends SimpleWord {
260-
261-
override def name: String = "bottomk"
239+
case class PriorityK(name: String, op: (TimeSeriesExpr, String, Int) => FilterExpr)
240+
extends SimpleWord {
262241

263242
protected def matcher: PartialFunction[List[Any], Boolean] = {
264243
case IntType(_) :: (_: String) :: TimeSeriesType(_) :: _ => true
@@ -267,17 +246,17 @@ object FilterVocabulary extends Vocabulary {
267246

268247
protected def executor: PartialFunction[List[Any], List[Any]] = {
269248
case IntType(k) :: (s: String) :: TimeSeriesType(t) :: stack =>
270-
FilterExpr.BottomK(t, s, k) :: stack
249+
op(t, s, k) :: stack
271250
case IntType(k) :: (s: String) :: (t: StyleExpr) :: stack =>
272-
t.copy(expr = FilterExpr.BottomK(t.expr, s, k)) :: stack
251+
t.copy(expr = op(t.expr, s, k)) :: stack
273252
}
274253

275254
override def signature: String = "TimeSeriesExpr stat:String k:Int -- FilterExpr"
276255

277256
override def summary: String =
278257
"""
279-
|Limit the output to the `K` time series with the lowest values for the specified
280-
|summary statistic.
258+
|Limit the output to the `K` time series with the highest priority values for the
259+
|specified summary statistic.
281260
""".stripMargin.trim
282261

283262
override def examples: List[String] =

0 commit comments

Comments
 (0)