Skip to content

Commit 83c422e

Browse files
committed
feat(alerting): implement transform objects
1 parent c180173 commit 83c422e

File tree

6 files changed

+128
-86
lines changed

6 files changed

+128
-86
lines changed

pkg/services/alerting/dashboard_parser.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ func ConvetAlertModelToAlertRule(ruleDef *m.AlertRuleModel) (*AlertRule, error)
108108
model.Transform = ruleDef.Expression.Get("transform").Get("type").MustString()
109109
model.TransformParams = *ruleDef.Expression.Get("transform")
110110

111+
if model.Transform == "aggregation" {
112+
model.Transformer = &AggregationTransformer{
113+
Method: ruleDef.Expression.Get("transform").Get("method").MustString(),
114+
}
115+
}
116+
111117
query := ruleDef.Expression.Get("query")
112118
model.Query = AlertQuery{
113119
Query: query.Get("query").MustString(),

pkg/services/alerting/evaluator.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package alerting
2+
3+
type compareFn func(float64, float64) bool
4+
5+
func evalCondition(level Level, result float64) bool {
6+
return operators[level.Operator](result, level.Level)
7+
}
8+
9+
var operators = map[string]compareFn{
10+
">": func(num1, num2 float64) bool { return num1 > num2 },
11+
">=": func(num1, num2 float64) bool { return num1 >= num2 },
12+
"<": func(num1, num2 float64) bool { return num1 < num2 },
13+
"<=": func(num1, num2 float64) bool { return num1 <= num2 },
14+
"": func(num1, num2 float64) bool { return false },
15+
}

pkg/services/alerting/executor.go

Lines changed: 9 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package alerting
33
import (
44
"fmt"
55

6-
"math"
7-
86
"github.com/grafana/grafana/pkg/bus"
97
"github.com/grafana/grafana/pkg/log"
108
m "github.com/grafana/grafana/pkg/models"
@@ -26,63 +24,6 @@ func NewExecutor() *ExecutorImpl {
2624
}
2725
}
2826

29-
type compareFn func(float64, float64) bool
30-
type aggregationFn func(*tsdb.TimeSeries) float64
31-
32-
var operators = map[string]compareFn{
33-
">": func(num1, num2 float64) bool { return num1 > num2 },
34-
">=": func(num1, num2 float64) bool { return num1 >= num2 },
35-
"<": func(num1, num2 float64) bool { return num1 < num2 },
36-
"<=": func(num1, num2 float64) bool { return num1 <= num2 },
37-
"": func(num1, num2 float64) bool { return false },
38-
}
39-
var aggregator = map[string]aggregationFn{
40-
"avg": func(series *tsdb.TimeSeries) float64 {
41-
sum := float64(0)
42-
43-
for _, v := range series.Points {
44-
sum += v[0]
45-
}
46-
47-
return sum / float64(len(series.Points))
48-
},
49-
"sum": func(series *tsdb.TimeSeries) float64 {
50-
sum := float64(0)
51-
52-
for _, v := range series.Points {
53-
sum += v[0]
54-
}
55-
56-
return sum
57-
},
58-
"min": func(series *tsdb.TimeSeries) float64 {
59-
min := series.Points[0][0]
60-
61-
for _, v := range series.Points {
62-
if v[0] < min {
63-
min = v[0]
64-
}
65-
}
66-
67-
return min
68-
},
69-
"max": func(series *tsdb.TimeSeries) float64 {
70-
max := series.Points[0][0]
71-
72-
for _, v := range series.Points {
73-
if v[0] > max {
74-
max = v[0]
75-
}
76-
}
77-
78-
return max
79-
},
80-
"mean": func(series *tsdb.TimeSeries) float64 {
81-
midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
82-
return series.Points[midPosition][0]
83-
},
84-
}
85-
8627
func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
8728
timeSeries, err := e.executeQuery(job)
8829
if err != nil {
@@ -156,32 +97,25 @@ func (e *ExecutorImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice
15697

15798
for _, serie := range series {
15899
e.log.Debug("Evaluating series", "series", serie.Name)
100+
transformedValue, _ := rule.Transformer.Transform(serie)
159101

160-
if aggregator["avg"] == nil {
161-
continue
162-
}
163-
164-
var aggValue = aggregator["avg"](serie)
165-
var critOperartor = operators[rule.Critical.Operator]
166-
var critResult = critOperartor(aggValue, rule.Critical.Level)
167-
168-
e.log.Debug("Alert execution Crit", "name", serie.Name, "aggValue", aggValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult)
102+
critResult := evalCondition(rule.Critical, transformedValue)
103+
e.log.Debug("Alert execution Crit", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult)
169104
if critResult {
170105
return &AlertResult{
171106
State: alertstates.Critical,
172-
ActualValue: aggValue,
173-
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
107+
ActualValue: transformedValue,
108+
Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name),
174109
}
175110
}
176111

177-
var warnOperartor = operators[rule.Warning.Operator]
178-
var warnResult = warnOperartor(aggValue, rule.Warning.Level)
179-
e.log.Debug("Alert execution Warn", "name", serie.Name, "aggValue", aggValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult)
112+
warnResult := evalCondition(rule.Warning, transformedValue)
113+
e.log.Debug("Alert execution Warn", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult)
180114
if warnResult {
181115
return &AlertResult{
182116
State: alertstates.Warn,
183-
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
184-
ActualValue: aggValue,
117+
Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name),
118+
ActualValue: transformedValue,
185119
}
186120
}
187121
}

pkg/services/alerting/executor_test.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ func TestAlertingExecutor(t *testing.T) {
1414

1515
Convey("single time serie", func() {
1616
Convey("Show return ok since avg is above 2", func() {
17-
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
17+
rule := &AlertRule{
18+
Critical: Level{Level: 10, Operator: ">"},
19+
Transformer: &AggregationTransformer{Method: "avg"},
20+
}
1821

1922
timeSeries := []*tsdb.TimeSeries{
2023
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
@@ -25,7 +28,10 @@ func TestAlertingExecutor(t *testing.T) {
2528
})
2629

2730
Convey("Show return critical since below 2", func() {
28-
rule := &AlertRule{Critical: Level{Level: 10, Operator: "<"}}
31+
rule := &AlertRule{
32+
Critical: Level{Level: 10, Operator: "<"},
33+
Transformer: &AggregationTransformer{Method: "avg"},
34+
}
2935

3036
timeSeries := []*tsdb.TimeSeries{
3137
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
@@ -49,7 +55,10 @@ func TestAlertingExecutor(t *testing.T) {
4955
*/
5056

5157
Convey("Show return ok since avg is below 10", func() {
52-
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
58+
rule := &AlertRule{
59+
Critical: Level{Level: 10, Operator: ">"},
60+
Transformer: &AggregationTransformer{Method: "avg"},
61+
}
5362

5463
timeSeries := []*tsdb.TimeSeries{
5564
tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
@@ -60,7 +69,10 @@ func TestAlertingExecutor(t *testing.T) {
6069
})
6170

6271
Convey("Show return ok since min is below 10", func() {
63-
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
72+
rule := &AlertRule{
73+
Critical: Level{Level: 10, Operator: ">"},
74+
Transformer: &AggregationTransformer{Method: "avg"},
75+
}
6476

6577
timeSeries := []*tsdb.TimeSeries{
6678
tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
@@ -85,7 +97,10 @@ func TestAlertingExecutor(t *testing.T) {
8597

8698
Convey("muliple time series", func() {
8799
Convey("both are ok", func() {
88-
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
100+
rule := &AlertRule{
101+
Critical: Level{Level: 10, Operator: ">"},
102+
Transformer: &AggregationTransformer{Method: "avg"},
103+
}
89104

90105
timeSeries := []*tsdb.TimeSeries{
91106
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
@@ -97,7 +112,10 @@ func TestAlertingExecutor(t *testing.T) {
97112
})
98113

99114
Convey("first serie is good, second is critical", func() {
100-
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
115+
rule := &AlertRule{
116+
Critical: Level{Level: 10, Operator: ">"},
117+
Transformer: &AggregationTransformer{Method: "avg"},
118+
}
101119

102120
timeSeries := []*tsdb.TimeSeries{
103121
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),

pkg/services/alerting/models.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package alerting
22

33
import (
44
"github.com/grafana/grafana/pkg/components/simplejson"
5-
"github.com/grafana/grafana/pkg/tsdb"
65
)
76

87
type AlertJob struct {
@@ -36,10 +35,7 @@ type AlertRule struct {
3635
Query AlertQuery
3736
Transform string
3837
TransformParams simplejson.Json
39-
}
40-
41-
type Transformer interface {
42-
Transform(tsdb tsdb.TimeSeriesSlice) float64
38+
Transformer Transformer
4339
}
4440

4541
type Level struct {

pkg/services/alerting/transformer.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package alerting
2+
3+
import (
4+
"fmt"
5+
"math"
6+
7+
"github.com/grafana/grafana/pkg/tsdb"
8+
)
9+
10+
type Transformer interface {
11+
Transform(timeserie *tsdb.TimeSeries) (float64, error)
12+
}
13+
14+
type AggregationTransformer struct {
15+
Method string
16+
}
17+
18+
func (at *AggregationTransformer) Transform(timeserie *tsdb.TimeSeries) (float64, error) {
19+
20+
if at.Method == "avg" {
21+
sum := float64(0)
22+
for _, point := range timeserie.Points {
23+
sum += point[0]
24+
}
25+
26+
return sum / float64(len(timeserie.Points)), nil
27+
}
28+
29+
//"sum": func(series *tsdb.TimeSeries) float64 {
30+
if at.Method == "sum" {
31+
sum := float64(0)
32+
33+
for _, v := range timeserie.Points {
34+
sum += v[0]
35+
}
36+
37+
return sum, nil
38+
}
39+
40+
//"min": func(series *tsdb.TimeSeries) float64 {
41+
if at.Method == "min" {
42+
min := timeserie.Points[0][0]
43+
44+
for _, v := range timeserie.Points {
45+
if v[0] < min {
46+
min = v[0]
47+
}
48+
}
49+
50+
return min, nil
51+
}
52+
53+
//"max": func(series *tsdb.TimeSeries) float64 {
54+
if at.Method == "max" {
55+
max := timeserie.Points[0][0]
56+
57+
for _, v := range timeserie.Points {
58+
if v[0] > max {
59+
max = v[0]
60+
}
61+
}
62+
63+
return max, nil
64+
}
65+
66+
//"mean": func(series *tsdb.TimeSeries) float64 {
67+
if at.Method == "mean" {
68+
midPosition := int64(math.Floor(float64(len(timeserie.Points)) / float64(2)))
69+
return timeserie.Points[midPosition][0], nil
70+
}
71+
72+
return float64(0), fmt.Errorf("Missing method")
73+
}

0 commit comments

Comments
 (0)