Skip to content

Commit c28d004

Browse files
committed
Merge branch 'go_routines'
2 parents 2750c8a + 71e2c6f commit c28d004

File tree

15 files changed

+318
-139
lines changed

15 files changed

+318
-139
lines changed

pkg/cmd/grafana-server/main.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"io/ioutil"
@@ -12,18 +13,26 @@ import (
1213
"syscall"
1314
"time"
1415

16+
"golang.org/x/sync/errgroup"
17+
1518
"github.com/grafana/grafana/pkg/log"
1619
"github.com/grafana/grafana/pkg/login"
1720
"github.com/grafana/grafana/pkg/metrics"
1821
"github.com/grafana/grafana/pkg/plugins"
19-
alertingInit "github.com/grafana/grafana/pkg/services/alerting/init"
20-
"github.com/grafana/grafana/pkg/services/backgroundtasks"
22+
"github.com/grafana/grafana/pkg/services/cleanup"
2123
"github.com/grafana/grafana/pkg/services/eventpublisher"
2224
"github.com/grafana/grafana/pkg/services/notifications"
2325
"github.com/grafana/grafana/pkg/services/search"
2426
"github.com/grafana/grafana/pkg/services/sqlstore"
2527
"github.com/grafana/grafana/pkg/setting"
2628
"github.com/grafana/grafana/pkg/social"
29+
30+
"github.com/grafana/grafana/pkg/services/alerting"
31+
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
32+
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
33+
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
34+
_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
35+
_ "github.com/grafana/grafana/pkg/tsdb/testdata"
2736
)
2837

2938
var version = "3.1.0"
@@ -57,26 +66,41 @@ func main() {
5766
setting.BuildCommit = commit
5867
setting.BuildStamp = buildstampInt64
5968

60-
go listenToSystemSignals()
69+
appContext, shutdownFn := context.WithCancel(context.Background())
70+
grafanaGroup, appContext := errgroup.WithContext(appContext)
71+
72+
go listenToSystemSignals(shutdownFn, grafanaGroup)
6173

6274
flag.Parse()
6375
writePIDFile()
76+
6477
initRuntime()
78+
initSql()
6579
metrics.Init()
6680
search.Init()
6781
login.Init()
6882
social.NewOAuthService()
6983
eventpublisher.Init()
7084
plugins.Init()
71-
alertingInit.Init()
72-
backgroundtasks.Init()
85+
86+
// init alerting
87+
if setting.AlertingEnabled {
88+
engine := alerting.NewEngine()
89+
grafanaGroup.Go(func() error { return engine.Run(appContext) })
90+
}
91+
92+
// cleanup service
93+
cleanUpService := cleanup.NewCleanUpService()
94+
grafanaGroup.Go(func() error { return cleanUpService.Run(appContext) })
7395

7496
if err := notifications.Init(); err != nil {
7597
log.Fatal(3, "Notification service failed to initialize", err)
7698
}
7799

78-
StartServer()
79-
exitChan <- 0
100+
exitCode := StartServer()
101+
102+
grafanaGroup.Wait()
103+
exitChan <- exitCode
80104
}
81105

82106
func initRuntime() {
@@ -94,7 +118,9 @@ func initRuntime() {
94118
logger.Info("Starting Grafana", "version", version, "commit", commit, "compiled", time.Unix(setting.BuildStamp, 0))
95119

96120
setting.LogConfigurationInfo()
121+
}
97122

123+
func initSql() {
98124
sqlstore.NewEngine()
99125
sqlstore.EnsureAdminUser()
100126
}
@@ -117,15 +143,15 @@ func writePIDFile() {
117143
}
118144
}
119145

120-
func listenToSystemSignals() {
146+
func listenToSystemSignals(cancel context.CancelFunc, grafanaGroup *errgroup.Group) {
121147
signalChan := make(chan os.Signal, 1)
122148
code := 0
123149

124150
signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGTERM)
125151

126152
select {
127153
case sig := <-signalChan:
128-
log.Info("Received signal %s. shutting down", sig)
154+
log.Info2("Received system signal. Shutting down", "signal", sig)
129155
case code = <-exitChan:
130156
switch code {
131157
case 0:
@@ -135,6 +161,8 @@ func listenToSystemSignals() {
135161
}
136162
}
137163

164+
cancel()
165+
grafanaGroup.Wait()
138166
log.Close()
139167
os.Exit(code)
140168
}

pkg/cmd/grafana-server/web.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package main
66
import (
77
"fmt"
88
"net/http"
9-
"os"
109
"path"
1110

1211
"gopkg.in/macaron.v1"
@@ -79,7 +78,7 @@ func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) {
7978
))
8079
}
8180

82-
func StartServer() {
81+
func StartServer() int {
8382
logger = log.New("server")
8483

8584
var err error
@@ -95,11 +94,13 @@ func StartServer() {
9594
err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
9695
default:
9796
logger.Error("Invalid protocol", "protocol", setting.Protocol)
98-
os.Exit(1)
97+
return 1
9998
}
10099

101100
if err != nil {
102101
logger.Error("Fail to start server", "error", err)
103-
os.Exit(1)
102+
return 1
104103
}
104+
105+
return 0
105106
}

pkg/models/dashboard_snapshot.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ type DeleteDashboardSnapshotCommand struct {
6363
DeleteKey string `json:"-"`
6464
}
6565

66+
type DeleteExpiredSnapshotsCommand struct {
67+
}
68+
6669
type GetDashboardSnapshotQuery struct {
6770
Key string
6871

pkg/models/timer.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

pkg/services/alerting/engine.go

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package alerting
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/benbjohnson/clock"
78
"github.com/grafana/grafana/pkg/log"
9+
"golang.org/x/sync/errgroup"
810
)
911

1012
type Engine struct {
@@ -34,20 +36,27 @@ func NewEngine() *Engine {
3436
return e
3537
}
3638

37-
func (e *Engine) Start() {
38-
e.log.Info("Starting Alerting Engine")
39+
func (e *Engine) Run(ctx context.Context) error {
40+
e.log.Info("Initializing Alerting")
3941

40-
go e.alertingTicker()
41-
go e.execDispatcher()
42-
go e.resultDispatcher()
42+
g, ctx := errgroup.WithContext(ctx)
43+
44+
g.Go(func() error { return e.alertingTicker(ctx) })
45+
g.Go(func() error { return e.execDispatcher(ctx) })
46+
g.Go(func() error { return e.resultDispatcher(ctx) })
47+
48+
err := g.Wait()
49+
50+
e.log.Info("Stopped Alerting", "reason", err)
51+
return err
4352
}
4453

4554
func (e *Engine) Stop() {
4655
close(e.execQueue)
4756
close(e.resultQueue)
4857
}
4958

50-
func (e *Engine) alertingTicker() {
59+
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
5160
defer func() {
5261
if err := recover(); err != nil {
5362
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
@@ -58,6 +67,8 @@ func (e *Engine) alertingTicker() {
5867

5968
for {
6069
select {
70+
case <-grafanaCtx.Done():
71+
return grafanaCtx.Err()
6172
case tick := <-e.ticker.C:
6273
// TEMP SOLUTION update rules ever tenth tick
6374
if tickIndex%10 == 0 {
@@ -70,31 +81,59 @@ func (e *Engine) alertingTicker() {
7081
}
7182
}
7283

73-
func (e *Engine) execDispatcher() {
74-
for job := range e.execQueue {
75-
e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id)
76-
go e.executeJob(job)
84+
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
85+
for {
86+
select {
87+
case <-grafanaCtx.Done():
88+
close(e.resultQueue)
89+
return grafanaCtx.Err()
90+
case job := <-e.execQueue:
91+
go e.executeJob(grafanaCtx, job)
92+
}
7793
}
7894
}
7995

80-
func (e *Engine) executeJob(job *Job) {
96+
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
8197
defer func() {
8298
if err := recover(); err != nil {
8399
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
84100
}
85101
}()
86102

87-
job.Running = true
88-
context := NewEvalContext(job.Rule)
89-
e.evalHandler.Eval(context)
90-
job.Running = false
103+
done := make(chan *EvalContext, 1)
104+
go func() {
105+
job.Running = true
106+
context := NewEvalContext(job.Rule)
107+
e.evalHandler.Eval(context)
108+
job.Running = false
109+
done <- context
110+
close(done)
111+
}()
112+
113+
select {
91114

92-
e.resultQueue <- context
115+
case <-grafanaCtx.Done():
116+
return grafanaCtx.Err()
117+
case evalContext := <-done:
118+
e.resultQueue <- evalContext
119+
}
120+
121+
return nil
93122
}
94123

95-
func (e *Engine) resultDispatcher() {
96-
for result := range e.resultQueue {
97-
go e.handleResponse(result)
124+
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
125+
for {
126+
select {
127+
case <-grafanaCtx.Done():
128+
//handle all responses before shutting down.
129+
for result := range e.resultQueue {
130+
e.handleResponse(result)
131+
}
132+
133+
return grafanaCtx.Err()
134+
case result := <-e.resultQueue:
135+
e.handleResponse(result)
136+
}
98137
}
99138
}
100139

pkg/services/alerting/init/init.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package init
22

33
import (
4+
"context"
5+
46
"github.com/grafana/grafana/pkg/services/alerting"
57
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
68
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
@@ -10,13 +12,11 @@ import (
1012
_ "github.com/grafana/grafana/pkg/tsdb/testdata"
1113
)
1214

13-
var engine *alerting.Engine
14-
15-
func Init() {
15+
func Init(ctx context.Context) error {
1616
if !setting.AlertingEnabled {
17-
return
17+
return nil
1818
}
1919

2020
engine = alerting.NewEngine()
21-
engine.Start()
21+
return engine.Start(ctx)
2222
}

pkg/services/backgroundtasks/background_tasks.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)