Skip to content

Commit 479e44c

Browse files
authored
core: deterministic tagging for aggregated groups (#1749)
Updates aggregation functions applied to the result of a group by to have a deterministic set of tags based on the exact matches. Before if there was a single group, then the group tag would be preserved in the result causing the tag set for the result to vary depending on the data that matched.
1 parent b12dac3 commit 479e44c

File tree

15 files changed

+105
-25
lines changed

15 files changed

+105
-25
lines changed

atlas-core/src/main/scala/com/netflix/atlas/core/model/FilterExpr.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ object FilterExpr {
221221
case _ =>
222222
val others = aggregator.result()
223223
val otherTags = others.tags ++ finalGrouping.map(k => k -> "--others--").toMap
224-
others.withTags(otherTags) :: buffer.toList.map(_.timeSeries)
224+
val otherLabel = TimeSeries.toLabel(otherTags)
225+
others.withTags(otherTags).withLabel(otherLabel) :: buffer.toList.map(_.timeSeries)
225226
}
226227
ResultSet(this, newData, rs.state)
227228
}

atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala

+15-1
Original file line numberDiff line numberDiff line change
@@ -728,16 +728,30 @@ object MathExpr {
728728

729729
def finalGrouping: List[String] = Nil
730730

731+
/**
732+
* Returns the set of tag keys that should be present on time series evaluated by
733+
* this expression. This will be the exact set in the first data expression.
734+
*/
735+
private def filterKeys(tags: Map[String, String]): Map[String, String] = dataExprs match {
736+
case dataExpr :: _ =>
737+
val keys = Query.exactKeys(dataExpr.query)
738+
tags.filter(t => keys.contains(t._1))
739+
case _ =>
740+
tags
741+
}
742+
731743
def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
732744
val rs = expr.eval(context, data)
733745
val ts =
734746
if (rs.data.isEmpty) {
735747
List(TimeSeries.noData(context.step))
736748
} else {
749+
val tags = filterKeys(rs.data.head.tags)
750+
val label = TimeSeries.toLabel(tags)
737751
val aggr = aggregator(context.start, context.end)
738752
rs.data.foreach(aggr.update)
739753
val t = aggr.result()
740-
List(TimeSeries(t.tags, s"$name(${t.label})", t.data))
754+
List(TimeSeries(tags, s"$name($label)", t.data))
741755
}
742756
ResultSet(this, ts, rs.state)
743757
}

atlas-core/src/main/scala/com/netflix/atlas/core/model/TimeSeries.scala

-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ object TimeSeries {
126126
aggrTags = t.tags
127127
} else {
128128
aggrBuffer.update(t.data)(f)
129-
aggrTags = TaggedItem.aggrTags(aggrTags, t.tags)
130129
}
131130
}
132131

@@ -157,7 +156,6 @@ object TimeSeries {
157156
aggrTags = t.tags
158157
} else {
159158
aggrBuffer.update(t.data)(countNaN)
160-
aggrTags = TaggedItem.aggrTags(aggrTags, t.tags)
161159
}
162160
}
163161

atlas-core/src/test/scala/com/netflix/atlas/core/model/MathGroupBySuite.scala

+38-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MathGroupBySuite extends FunSuite {
3030
val seq = new ArrayTimeSeq(DsType.Gauge, start, step, Array(v.toDouble))
3131
val mode = "mode" -> (if (v % 2 == 0) "even" else "odd")
3232
val value = "value" -> v.toString
33-
TimeSeries(Map("name" -> "test", mode, value), seq)
33+
TimeSeries(Map("name" -> "test", mode, value, "foo" -> "bar"), seq)
3434
}
3535

3636
def groupBy(
@@ -144,6 +144,43 @@ class MathGroupBySuite extends FunSuite {
144144
assertEquals(rs, expected)
145145
}
146146

147+
test("name,test,:eq,(,foo,),:by,:max") {
148+
val input = List(
149+
ts(1),
150+
ts(2),
151+
ts(3)
152+
)
153+
val context = EvalContext(start, start + step * n, step)
154+
val dataBy = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "test")), List("foo"))
155+
val expr = MathExpr.Max(dataBy)
156+
val rs = expr.eval(context, input).data
157+
assertEquals(rs.size, 1)
158+
159+
val expected = List(
160+
ts(6).withTags(Map("name" -> "test")).withLabel("max(name=test)")
161+
)
162+
assertEquals(rs, expected)
163+
}
164+
165+
test("name,test,:eq,(,mode,foo,),:by,:max,(,mode,),:by") {
166+
val input = List(
167+
ts(1),
168+
ts(2),
169+
ts(3)
170+
)
171+
val context = EvalContext(start, start + step * n, step)
172+
val dataBy = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "test")), List("mode", "foo"))
173+
val expr = MathExpr.GroupBy(MathExpr.Max(dataBy), List("mode"))
174+
val rs = expr.eval(context, input).data
175+
assertEquals(rs.size, 2)
176+
177+
val expected = List(
178+
ts(2).withTags(Map("name" -> "test", "mode" -> "even")).withLabel("(mode=even)"),
179+
ts(4).withTags(Map("name" -> "test", "mode" -> "odd")).withLabel("(mode=odd)")
180+
)
181+
assertEquals(rs, expected)
182+
}
183+
147184
test("(,value,mode,),:by,:count,(,mode,),:by") {
148185
val input = List(
149186
ts(1),

atlas-core/src/test/scala/com/netflix/atlas/core/model/TimeSeriesExprSuite.scala

+18-18
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,20 @@ class TimeSeriesExprSuite extends FunSuite {
4040
":true,:sum,:sum" -> const(ts(unknownTag, "name=unknown", 55)),
4141
":true,:max,:sum" -> const(ts(unknownTag, "name=unknown", 10)),
4242
":true,:sum,:max" -> const(ts(unknownTag, "name=unknown", 55)),
43-
":true,:sum,:count" -> const(ts(unknownTag, "count(name=unknown)", 1)),
44-
":true,:avg,:count" -> const(ts(unknownTag, "count(name=unknown)", 1)),
45-
":true,:sum,:avg" -> const(ts(unknownTag, "(name=unknown / count(name=unknown))", 55)),
46-
":true,:avg,:avg" -> const(ts(unknownTag, "(sum(name=unknown) / count(name=unknown))", 5)),
47-
":false,:sum" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
48-
":false,:min" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
49-
":false,:max" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
50-
":false,:count" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
51-
"name,:has" -> const(ts(unknownTag, 55)),
52-
"name,1,:eq" -> const(ts(Map("name" -> "1"), 1)),
53-
"name,1,:re" -> const(ts(unknownTag, 11)),
54-
"name,2,:re" -> const(ts(unknownTag, 2)),
55-
"name,2,:contains" -> const(ts(unknownTag, 2)),
56-
"name,(,1,10,),:in" -> const(ts(unknownTag, 11)),
43+
":true,:sum,:count" -> const(ts(Map.empty[String, String], "count(NO TAGS)", 1)),
44+
":true,:avg,:count" -> const(ts(Map.empty[String, String], "count(NO TAGS)", 1)),
45+
":true,:sum,:avg" -> const(ts(unknownTag, "(name=unknown / count(NO TAGS))", 55)),
46+
":true,:avg,:avg" -> const(ts(Map.empty[String, String], "(sum(NO TAGS) / count(NO TAGS))", 5)),
47+
":false,:sum" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
48+
":false,:min" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
49+
":false,:max" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
50+
":false,:count" -> const(ts(Map("name" -> "NO_DATA"), "NO DATA", Double.NaN)),
51+
"name,:has" -> const(ts(unknownTag, 55)),
52+
"name,1,:eq" -> const(ts(Map("name" -> "1"), 1)),
53+
"name,1,:re" -> const(ts(unknownTag, 11)),
54+
"name,2,:re" -> const(ts(unknownTag, 2)),
55+
"name,2,:contains" -> const(ts(unknownTag, 2)),
56+
"name,(,1,10,),:in" -> const(ts(unknownTag, 11)),
5757
"name,1,:eq,name,10,:eq,:or" -> const(ts(unknownTag, 11)),
5858
":true,:abs" -> const(ts(unknownTag, "abs(name=unknown)", 55.0)),
5959
"10,:abs" -> const(ts(Map("name" -> "10.0"), "abs(10.0)", 10.0)),
@@ -116,7 +116,7 @@ class TimeSeriesExprSuite extends FunSuite {
116116
":true,:sum,(,name,),:by,:sum" -> const(ts(Map.empty[String, String], "sum(NO TAGS)", 55)),
117117
":true,:min,(,name,),:by,:min" -> const(ts(Map.empty[String, String], "min(NO TAGS)", 0)),
118118
":true,:max,(,name,),:by,:max" -> const(ts(Map.empty[String, String], "max(NO TAGS)", 10)),
119-
":true,:max,(,type,),:by,:sum" -> const(ts("type" -> "constant", "sum(type=constant)", 10)),
119+
":true,:max,(,type,),:by,:sum" -> const(ts(Map.empty[String, String], "sum(NO TAGS)", 10)),
120120
":true,(,name,),:by,:avg" -> const(
121121
ts(Map.empty[String, String], "(sum(NO TAGS) / count(NO TAGS))", 5)
122122
),
@@ -190,9 +190,9 @@ class TimeSeriesExprSuite extends FunSuite {
190190
":true,(,name,),:by,:stddev" -> const(
191191
ts(Map.empty[String, String], stddevLegend("NO TAGS"), 3.1622776601683795)
192192
),
193-
":true,:max,:stddev" -> const(ts(Map("name" -> "unknown"), stddevLegend("name=unknown"), 0.0)),
194-
":true,:sum,:stddev" -> const(ts(Map("name" -> "unknown"), stddevLegend("name=unknown"), 0.0)),
195-
":true,:stddev" -> const(ts(Map("name" -> "unknown"), stddevLegend("name=unknown"), 0.0)),
193+
":true,:max,:stddev" -> const(ts(Map.empty[String, String], stddevLegend("NO TAGS"), 0.0)),
194+
":true,:sum,:stddev" -> const(ts(Map.empty[String, String], stddevLegend("NO TAGS"), 0.0)),
195+
":true,:stddev" -> const(ts(Map.empty[String, String], stddevLegend("NO TAGS"), 0.0)),
196196
"42" -> const(ts(42))
197197
)
198198

Loading
Loading
Loading
Loading
Loading
Loading
Loading
Loading

atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -989,12 +989,12 @@ class EvaluatorSuite extends FunSuite {
989989
dsl.size match {
990990
case 1 =>
991991
val msg = DiagnosticMessage.error(s"failed rewrite")
992-
context.dsLogger(dsl(0), msg)
992+
context.dsLogger(dsl.head, msg)
993993
DataSources.empty()
994994
case 2 =>
995995
val msg = DiagnosticMessage.error(s"failed rewrite")
996996
context.dsLogger(dsl(1), msg)
997-
DataSources.of(dsl(0))
997+
DataSources.of(dsl.head)
998998
}
999999
}
10001000
} else {

atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala

+30
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,36 @@ class FinalExprEvalSuite extends FunSuite {
586586
}
587587
}
588588

589+
test("multi-level aggr tagging") {
590+
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "rps")), List("node", "app"))
591+
val baseTags = Map("name" -> "rps", "app" -> "foo")
592+
val input = List(
593+
sources(ds("a", s"http://atlas/graph?q=$expr,:max")),
594+
group(
595+
0,
596+
AggrDatapoint(0, step, expr, "i-1", baseTags + ("node" -> "i-1"), 42.0),
597+
AggrDatapoint(0, step, expr, "i-2", baseTags + ("node" -> "i-2"), 1.0)
598+
),
599+
group(
600+
1,
601+
AggrDatapoint(0, step, expr, "i-1", baseTags + ("node" -> "i-1"), 42.0),
602+
AggrDatapoint(0, step, expr, "i-2", baseTags + ("node" -> "i-2"), 43.0)
603+
)
604+
)
605+
606+
val output = run(input)
607+
608+
val expected = Array(42.0, 43.0)
609+
val timeseries = output.filter(isTimeSeries)
610+
assertEquals(timeseries.size, 2)
611+
timeseries.zip(expected).foreach {
612+
case (envelope, expectedValue) =>
613+
val ts = envelope.message.asInstanceOf[TimeSeriesMessage]
614+
checkValue(ts, expectedValue)
615+
assertEquals(ts.tags, Map("name" -> "rps", "atlas.offset" -> "0w"))
616+
}
617+
}
618+
589619
// https://github.com/Netflix/atlas/issues/762
590620
test(":legend is honored") {
591621
val expr = DataExpr.Sum(Query.Equal("name", "rps"))

0 commit comments

Comments
 (0)