Skip to content

Commit 34b31ae

Browse files
committed
tech(goroutines): sync state between different go routines
1 parent 8e89173 commit 34b31ae

File tree

9 files changed

+221
-41
lines changed

9 files changed

+221
-41
lines changed

pkg/cmd/grafana-server/main.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"io/ioutil"
@@ -12,6 +13,8 @@ import (
1213
"syscall"
1314
"time"
1415

16+
"golang.org/x/sync/errgroup"
17+
1518
"github.com/grafana/grafana/pkg/log"
1619
"github.com/grafana/grafana/pkg/login"
1720
"github.com/grafana/grafana/pkg/metrics"
@@ -57,26 +60,33 @@ func main() {
5760
setting.BuildCommit = commit
5861
setting.BuildStamp = buildstampInt64
5962

60-
go listenToSystemSignals()
63+
appContext, cancelFn := context.WithCancel(context.Background())
64+
grafanaGroup, _ := errgroup.WithContext(appContext)
65+
66+
go listenToSystemSignals(cancelFn, grafanaGroup)
6167

6268
flag.Parse()
6369
writePIDFile()
6470
initRuntime()
71+
initSql()
6572
metrics.Init()
6673
search.Init()
6774
login.Init()
6875
social.NewOAuthService()
6976
eventpublisher.Init()
7077
plugins.Init()
71-
alertingInit.Init()
72-
backgroundtasks.Init()
78+
79+
grafanaGroup.Go(func() error { return alertingInit.Init(appContext) })
80+
grafanaGroup.Go(func() error { return backgroundtasks.Init(appContext) })
7381

7482
if err := notifications.Init(); err != nil {
7583
log.Fatal(3, "Notification service failed to initialize", err)
7684
}
7785

78-
StartServer()
79-
exitChan <- 0
86+
exitCode := StartServer()
87+
88+
grafanaGroup.Wait()
89+
exitChan <- exitCode
8090
}
8191

8292
func initRuntime() {
@@ -94,7 +104,9 @@ func initRuntime() {
94104
logger.Info("Starting Grafana", "version", version, "commit", commit, "compiled", time.Unix(setting.BuildStamp, 0))
95105

96106
setting.LogConfigurationInfo()
107+
}
97108

109+
func initSql() {
98110
sqlstore.NewEngine()
99111
sqlstore.EnsureAdminUser()
100112
}
@@ -117,15 +129,15 @@ func writePIDFile() {
117129
}
118130
}
119131

120-
func listenToSystemSignals() {
132+
func listenToSystemSignals(cancel context.CancelFunc, grafanaGroup *errgroup.Group) {
121133
signalChan := make(chan os.Signal, 1)
122134
code := 0
123135

124136
signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGTERM)
125137

126138
select {
127139
case sig := <-signalChan:
128-
log.Info("Received signal %s. shutting down", sig)
140+
log.Info2("Received system signal. Shutting down", "signal", sig)
129141
case code = <-exitChan:
130142
switch code {
131143
case 0:
@@ -135,6 +147,8 @@ func listenToSystemSignals() {
135147
}
136148
}
137149

150+
cancel()
151+
grafanaGroup.Wait()
138152
log.Close()
139153
os.Exit(code)
140154
}

pkg/cmd/grafana-server/web.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package main
66
import (
77
"fmt"
88
"net/http"
9-
"os"
109
"path"
1110

1211
"gopkg.in/macaron.v1"
@@ -79,7 +78,7 @@ func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) {
7978
))
8079
}
8180

82-
func StartServer() {
81+
func StartServer() int {
8382
logger = log.New("server")
8483

8584
var err error
@@ -95,11 +94,13 @@ func StartServer() {
9594
err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
9695
default:
9796
logger.Error("Invalid protocol", "protocol", setting.Protocol)
98-
os.Exit(1)
97+
return 1
9998
}
10099

101100
if err != nil {
102101
logger.Error("Fail to start server", "error", err)
103-
os.Exit(1)
102+
return 1
104103
}
104+
105+
return 0
105106
}

pkg/services/alerting/engine.go

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package alerting
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/benbjohnson/clock"
78
"github.com/grafana/grafana/pkg/log"
9+
"golang.org/x/sync/errgroup"
810
)
911

1012
type Engine struct {
@@ -34,20 +36,24 @@ func NewEngine() *Engine {
3436
return e
3537
}
3638

37-
func (e *Engine) Start() {
39+
func (e *Engine) Start(grafanaCtx context.Context) error {
3840
e.log.Info("Starting Alerting Engine")
3941

40-
go e.alertingTicker()
41-
go e.execDispatcher()
42-
go e.resultDispatcher()
42+
g, _ := errgroup.WithContext(grafanaCtx)
43+
44+
g.Go(func() error { return e.alertingTicker(grafanaCtx) })
45+
g.Go(func() error { return e.execDispatcher(grafanaCtx) })
46+
g.Go(func() error { return e.resultDispatcher(grafanaCtx) })
47+
48+
return g.Wait()
4349
}
4450

4551
func (e *Engine) Stop() {
4652
close(e.execQueue)
4753
close(e.resultQueue)
4854
}
4955

50-
func (e *Engine) alertingTicker() {
56+
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
5157
defer func() {
5258
if err := recover(); err != nil {
5359
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
@@ -58,6 +64,8 @@ func (e *Engine) alertingTicker() {
5864

5965
for {
6066
select {
67+
case <-grafanaCtx.Done():
68+
return grafanaCtx.Err()
6169
case tick := <-e.ticker.C:
6270
// TEMP SOLUTION update rules ever tenth tick
6371
if tickIndex%10 == 0 {
@@ -70,31 +78,56 @@ func (e *Engine) alertingTicker() {
7078
}
7179
}
7280

73-
func (e *Engine) execDispatcher() {
74-
for job := range e.execQueue {
75-
e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id)
76-
go e.executeJob(job)
81+
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
82+
for {
83+
select {
84+
case <-grafanaCtx.Done():
85+
close(e.resultQueue)
86+
return grafanaCtx.Err()
87+
case job := <-e.execQueue:
88+
go e.executeJob(grafanaCtx, job)
89+
}
7790
}
7891
}
7992

80-
func (e *Engine) executeJob(job *Job) {
93+
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) {
8194
defer func() {
8295
if err := recover(); err != nil {
8396
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
8497
}
8598
}()
8699

87-
job.Running = true
88-
context := NewEvalContext(job.Rule)
89-
e.evalHandler.Eval(context)
90-
job.Running = false
100+
done := make(chan *EvalContext, 1)
101+
go func() {
102+
job.Running = true
103+
context := NewEvalContext(job.Rule)
104+
e.evalHandler.Eval(context)
105+
job.Running = false
106+
done <- context
107+
close(done)
108+
}()
109+
110+
select {
111+
case evalContext := <-done:
112+
e.resultQueue <- evalContext
113+
case <-grafanaCtx.Done():
91114

92-
e.resultQueue <- context
115+
}
93116
}
94117

95-
func (e *Engine) resultDispatcher() {
96-
for result := range e.resultQueue {
97-
go e.handleResponse(result)
118+
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
119+
for {
120+
select {
121+
case <-grafanaCtx.Done():
122+
//handle all responses before shutting down.
123+
for result := range e.resultQueue {
124+
e.handleResponse(result)
125+
}
126+
127+
return grafanaCtx.Err()
128+
case result := <-e.resultQueue:
129+
e.handleResponse(result)
130+
}
98131
}
99132
}
100133

pkg/services/alerting/init/init.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package init
22

33
import (
4+
"context"
5+
46
"github.com/grafana/grafana/pkg/services/alerting"
57
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
68
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
@@ -11,11 +13,11 @@ import (
1113

1214
var engine *alerting.Engine
1315

14-
func Init() {
16+
func Init(ctx context.Context) error {
1517
if !setting.AlertingEnabled {
16-
return
18+
return nil
1719
}
1820

1921
engine = alerting.NewEngine()
20-
engine.Start()
22+
return engine.Start(ctx)
2123
}

pkg/services/backgroundtasks/background_tasks.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package backgroundtasks
55

66
import (
7+
"context"
78
"time"
89

10+
"golang.org/x/sync/errgroup"
11+
912
"github.com/grafana/grafana/pkg/bus"
1013
"github.com/grafana/grafana/pkg/log"
1114
"github.com/grafana/grafana/pkg/models"
@@ -15,18 +18,23 @@ var (
1518
tlog log.Logger = log.New("ticker")
1619
)
1720

18-
func Init() {
19-
go start()
21+
func Init(ctx context.Context) error {
22+
g, _ := errgroup.WithContext(ctx)
23+
g.Go(func() error { return start(ctx) })
24+
25+
return g.Wait()
2026
}
2127

22-
func start() {
28+
func start(ctx context.Context) error {
2329
go cleanup(time.Now())
2430

2531
ticker := time.NewTicker(time.Hour * 1)
2632
for {
2733
select {
2834
case tick := <-ticker.C:
2935
go cleanup(tick)
36+
case <-ctx.Done():
37+
return ctx.Err()
3038
}
3139
}
3240
}

vendor/golang.org/x/sync/LICENSE

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/PATENTS

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)