Skip to content

Commit c38f6ff

Browse files
bergquisttorkelo
authored andcommitted
Make alerting notifcations sync (grafana#6158)
* tech(routines): move the async logic from notification to alerting notifier * tech(notification): reduce code dupe * fix(notification): dont touch the response unless its an error * feat(alerting): make alerting exeuction async but flow sync * tech(alerting): remove commented code * tech(alerting): remove unused code * tech(alerting): fix typo * tech(alerting): implement Context on EvalContext * tech(alerting): wait for all alerts to return * feat(alerting): dont allow alert responses to cancel * Revert "feat(alerting): dont allow alert responses to cancel" This reverts commit 324b006. * feat(alerting): give alerts some time to finish before closing down
1 parent 36f0bf0 commit c38f6ff

30 files changed

+356
-273
lines changed

pkg/api/metrics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package api
22

33
import (
4+
"context"
45
"encoding/json"
56
"net/http"
67

@@ -31,7 +32,7 @@ func QueryMetrics(c *middleware.Context, reqDto dtos.MetricRequest) Response {
3132
})
3233
}
3334

34-
resp, err := tsdb.HandleRequest(request)
35+
resp, err := tsdb.HandleRequest(context.TODO(), request)
3536
if err != nil {
3637
return ApiError(500, "Metric request error", err)
3738
}

pkg/bus/bus.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package bus
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67
)
78

89
type HandlerFunc interface{}
10+
type CtxHandlerFunc func()
911
type Msg interface{}
1012

1113
type Bus interface {
1214
Dispatch(msg Msg) error
15+
DispatchCtx(ctx context.Context, msg Msg) error
1316
Publish(msg Msg) error
1417

1518
AddHandler(handler HandlerFunc)
19+
AddCtxHandler(handler HandlerFunc)
1620
AddEventListener(handler HandlerFunc)
1721
AddWildcardListener(handler HandlerFunc)
1822
}
@@ -34,6 +38,27 @@ func New() Bus {
3438
return bus
3539
}
3640

41+
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
42+
var msgName = reflect.TypeOf(msg).Elem().Name()
43+
44+
var handler = b.handlers[msgName]
45+
if handler == nil {
46+
return fmt.Errorf("handler not found for %s", msgName)
47+
}
48+
49+
var params = make([]reflect.Value, 2)
50+
params[0] = reflect.ValueOf(ctx)
51+
params[1] = reflect.ValueOf(msg)
52+
53+
ret := reflect.ValueOf(handler).Call(params)
54+
err := ret[0].Interface()
55+
if err == nil {
56+
return nil
57+
} else {
58+
return err.(error)
59+
}
60+
}
61+
3762
func (b *InProcBus) Dispatch(msg Msg) error {
3863
var msgName = reflect.TypeOf(msg).Elem().Name()
3964

@@ -90,6 +115,12 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) {
90115
b.handlers[queryTypeName] = handler
91116
}
92117

118+
func (b *InProcBus) AddCtxHandler(handler HandlerFunc) {
119+
handlerType := reflect.TypeOf(handler)
120+
queryTypeName := handlerType.In(1).Elem().Name()
121+
b.handlers[queryTypeName] = handler
122+
}
123+
93124
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
94125
handlerType := reflect.TypeOf(handler)
95126
eventName := handlerType.In(0).Elem().Name()
@@ -105,6 +136,11 @@ func AddHandler(implName string, handler HandlerFunc) {
105136
globalBus.AddHandler(handler)
106137
}
107138

139+
// Package level functions
140+
func AddCtxHandler(implName string, handler HandlerFunc) {
141+
globalBus.AddCtxHandler(handler)
142+
}
143+
108144
// Package level functions
109145
func AddEventListener(handler HandlerFunc) {
110146
globalBus.AddEventListener(handler)
@@ -118,6 +154,10 @@ func Dispatch(msg Msg) error {
118154
return globalBus.Dispatch(msg)
119155
}
120156

157+
func DispatchCtx(ctx context.Context, msg Msg) error {
158+
return globalBus.DispatchCtx(ctx, msg)
159+
}
160+
121161
func Publish(msg Msg) error {
122162
return globalBus.Publish(msg)
123163
}

pkg/models/notifications.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,24 @@ type SendEmailCommand struct {
1212
Info string
1313
}
1414

15+
type SendEmailCommandSync struct {
16+
SendEmailCommand
17+
}
18+
1519
type SendWebhook struct {
1620
Url string
1721
User string
1822
Password string
1923
Body string
2024
}
2125

26+
type SendWebhookSync struct {
27+
Url string
28+
User string
29+
Password string
30+
Body string
31+
}
32+
2233
type SendResetPasswordEmailCommand struct {
2334
User *User
2435
}

pkg/services/alerting/conditions/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *
8282
req := c.getRequestForAlertRule(getDsInfo.Result, timeRange)
8383
result := make(tsdb.TimeSeriesSlice, 0)
8484

85-
resp, err := c.HandleRequest(req)
85+
resp, err := c.HandleRequest(context.Context, req)
8686
if err != nil {
8787
return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
8888
}

pkg/services/alerting/conditions/query_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package conditions
22

33
import (
4+
"context"
45
"testing"
56

67
null "gopkg.in/guregu/null.v3"
@@ -137,7 +138,7 @@ func (ctx *queryConditionTestContext) exec() {
137138

138139
ctx.condition = condition
139140

140-
condition.HandleRequest = func(req *tsdb.Request) (*tsdb.Response, error) {
141+
condition.HandleRequest = func(context context.Context, req *tsdb.Request) (*tsdb.Response, error) {
141142
return &tsdb.Response{
142143
Results: map[string]*tsdb.QueryResult{
143144
"A": {Series: ctx.series},

pkg/services/alerting/engine.go

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
type Engine struct {
1313
execQueue chan *Job
14-
resultQueue chan *EvalContext
1514
clock clock.Clock
1615
ticker *Ticker
1716
scheduler Scheduler
@@ -25,7 +24,6 @@ func NewEngine() *Engine {
2524
e := &Engine{
2625
ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
2726
execQueue: make(chan *Job, 1000),
28-
resultQueue: make(chan *EvalContext, 1000),
2927
scheduler: NewScheduler(),
3028
evalHandler: NewEvalHandler(),
3129
ruleReader: NewRuleReader(),
@@ -39,23 +37,17 @@ func NewEngine() *Engine {
3937
func (e *Engine) Run(ctx context.Context) error {
4038
e.log.Info("Initializing Alerting")
4139

42-
g, ctx := errgroup.WithContext(ctx)
40+
alertGroup, ctx := errgroup.WithContext(ctx)
4341

44-
g.Go(func() error { return e.alertingTicker(ctx) })
45-
g.Go(func() error { return e.execDispatcher(ctx) })
46-
g.Go(func() error { return e.resultDispatcher(ctx) })
42+
alertGroup.Go(func() error { return e.alertingTicker(ctx) })
43+
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
4744

48-
err := g.Wait()
45+
err := alertGroup.Wait()
4946

5047
e.log.Info("Stopped Alerting", "reason", err)
5148
return err
5249
}
5350

54-
func (e *Engine) Stop() {
55-
close(e.execQueue)
56-
close(e.resultQueue)
57-
}
58-
5951
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
6052
defer func() {
6153
if err := recover(); err != nil {
@@ -81,69 +73,58 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
8173
}
8274
}
8375

84-
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
76+
func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
77+
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
78+
8579
for {
8680
select {
8781
case <-grafanaCtx.Done():
88-
close(e.resultQueue)
89-
return grafanaCtx.Err()
82+
return dispatcherGroup.Wait()
9083
case job := <-e.execQueue:
91-
go e.executeJob(grafanaCtx, job)
84+
dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
9285
}
9386
}
9487
}
9588

96-
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
89+
var (
90+
unfinishedWorkTimeout time.Duration = time.Second * 5
91+
alertTimeout time.Duration = time.Second * 30
92+
)
93+
94+
func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
9795
defer func() {
9896
if err := recover(); err != nil {
99-
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
97+
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
10098
}
10199
}()
102100

103-
done := make(chan *EvalContext, 1)
101+
alertCtx, cancelFn := context.WithTimeout(context.TODO(), alertTimeout)
102+
103+
job.Running = true
104+
evalContext := NewEvalContext(alertCtx, job.Rule)
105+
106+
done := make(chan struct{})
107+
104108
go func() {
105-
job.Running = true
106-
context := NewEvalContext(job.Rule)
107-
e.evalHandler.Eval(context)
108-
job.Running = false
109-
done <- context
109+
e.evalHandler.Eval(evalContext)
110+
e.resultHandler.Handle(evalContext)
110111
close(done)
111112
}()
112113

114+
var err error = nil
113115
select {
114-
115116
case <-grafanaCtx.Done():
116-
return grafanaCtx.Err()
117-
case evalContext := <-done:
118-
e.resultQueue <- evalContext
119-
}
120-
121-
return nil
122-
}
123-
124-
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
125-
for {
126117
select {
127-
case <-grafanaCtx.Done():
128-
//handle all responses before shutting down.
129-
for result := range e.resultQueue {
130-
e.handleResponse(result)
131-
}
132-
133-
return grafanaCtx.Err()
134-
case result := <-e.resultQueue:
135-
e.handleResponse(result)
118+
case <-time.After(unfinishedWorkTimeout):
119+
cancelFn()
120+
err = grafanaCtx.Err()
121+
case <-done:
136122
}
123+
case <-done:
137124
}
138-
}
139125

140-
func (e *Engine) handleResponse(result *EvalContext) {
141-
defer func() {
142-
if err := recover(); err != nil {
143-
e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1))
144-
}
145-
}()
146-
147-
e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing)
148-
e.resultHandler.Handle(result)
126+
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
127+
job.Running = false
128+
cancelFn()
129+
return err
149130
}

pkg/services/alerting/eval_context.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package alerting
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -20,14 +21,30 @@ type EvalContext struct {
2021
StartTime time.Time
2122
EndTime time.Time
2223
Rule *Rule
23-
DoneChan chan bool
24-
CancelChan chan bool
2524
log log.Logger
2625
dashboardSlug string
2726
ImagePublicUrl string
2827
ImageOnDiskPath string
2928
NoDataFound bool
3029
RetryCount int
30+
31+
Context context.Context
32+
}
33+
34+
func (evalContext *EvalContext) Deadline() (deadline time.Time, ok bool) {
35+
return evalContext.Deadline()
36+
}
37+
38+
func (evalContext *EvalContext) Done() <-chan struct{} {
39+
return evalContext.Context.Done()
40+
}
41+
42+
func (evalContext *EvalContext) Err() error {
43+
return evalContext.Context.Err()
44+
}
45+
46+
func (evalContext *EvalContext) Value(key interface{}) interface{} {
47+
return evalContext.Context.Value(key)
3148
}
3249

3350
type StateDescription struct {
@@ -94,14 +111,13 @@ func (c *EvalContext) GetRuleUrl() (string, error) {
94111
}
95112
}
96113

97-
func NewEvalContext(rule *Rule) *EvalContext {
114+
func NewEvalContext(alertCtx context.Context, rule *Rule) *EvalContext {
98115
return &EvalContext{
116+
Context: alertCtx,
99117
StartTime: time.Now(),
100118
Rule: rule,
101119
Logs: make([]*ResultLogEntry, 0),
102120
EvalMatches: make([]*EvalMatch, 0),
103-
DoneChan: make(chan bool, 1),
104-
CancelChan: make(chan bool, 1),
105121
log: log.New("alerting.evalContext"),
106122
RetryCount: 0,
107123
}

0 commit comments

Comments
 (0)