Skip to content

Commit ca8b212

Browse files
committed
Merge remote-tracking branch 'origin/main' into cj/scaletest-trafficgen
2 parents 655d95a + a172e07 commit ca8b212

File tree

131 files changed

+2272
-1166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+2272
-1166
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ cli/testdata/.gen-golden: $(wildcard cli/testdata/*.golden) $(wildcard cli/*.tpl
526526
go test ./cli -run="Test(CommandHelp|ServerYAML)" -update
527527
touch "$@"
528528

529-
helm/tests/testdata/.gen-golden: $(wildcard helm/tests/testdata/*.golden) $(GO_SRC_FILES)
529+
helm/tests/testdata/.gen-golden: $(wildcard helm/tests/testdata/*.yaml) $(wildcard helm/tests/testdata/*.golden) $(GO_SRC_FILES)
530530
go test ./helm/tests -run=TestUpdateGoldenFiles -update
531531
touch "$@"
532532

cli/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,7 @@ func newProvisionerDaemon(
11851185
return nil, xerrors.Errorf("mkdir %q: %w", cacheDir, err)
11861186
}
11871187

1188+
tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
11881189
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
11891190
wg.Add(1)
11901191
go func() {
@@ -1204,6 +1205,7 @@ func newProvisionerDaemon(
12041205
},
12051206
CachePath: cacheDir,
12061207
Logger: logger,
1208+
Tracer: tracer,
12071209
})
12081210
if err != nil && !xerrors.Is(err, context.Canceled) {
12091211
select {

coderd/apidoc/docs.go

Lines changed: 7 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func New(options *Options) *API {
236236
if options.SSHConfig.HostnamePrefix == "" {
237237
options.SSHConfig.HostnamePrefix = "coder."
238238
}
239+
if options.TracerProvider == nil {
240+
options.TracerProvider = trace.NewNoopTracerProvider()
241+
}
239242
if options.SetUserGroups == nil {
240243
options.SetUserGroups = func(ctx context.Context, _ database.Store, id uuid.UUID, groups []string) error {
241244
options.Logger.Warn(ctx, "attempted to assign OIDC groups without enterprise license",
@@ -446,7 +449,7 @@ func New(options *Options) *API {
446449
r.Route(fmt.Sprintf("/%s", gitAuthConfig.ID), func(r chi.Router) {
447450
r.Use(
448451
httpmw.ExtractOAuth2(gitAuthConfig, options.HTTPClient, nil),
449-
apiKeyMiddleware,
452+
apiKeyMiddlewareRedirect,
450453
)
451454
r.Get("/callback", api.gitAuthCallback(gitAuthConfig))
452455
})
@@ -898,6 +901,7 @@ func compressHandler(h http.Handler) http.Handler {
898901
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
899902
// Useful when starting coderd and provisionerd in the same process.
900903
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
904+
tracer := api.TracerProvider.Tracer(tracing.TracerName)
901905
clientSession, serverSession := provisionersdk.MemTransportPipe()
902906
defer func() {
903907
if err != nil {
@@ -937,6 +941,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
937941
Provisioners: daemon.Provisioners,
938942
GitAuthConfigs: api.GitAuthConfigs,
939943
Telemetry: api.Telemetry,
944+
Tracer: tracer,
940945
Tags: tags,
941946
QuotaCommitter: &api.QuotaCommitter,
942947
Auditor: &api.Auditor,
@@ -947,14 +952,16 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
947952
if err != nil {
948953
return nil, err
949954
}
950-
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
951-
Log: func(err error) {
952-
if xerrors.Is(err, io.EOF) {
953-
return
954-
}
955-
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
955+
server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
956+
drpcserver.Options{
957+
Log: func(err error) {
958+
if xerrors.Is(err, io.EOF) {
959+
return
960+
}
961+
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
962+
},
956963
},
957-
})
964+
)
958965
go func() {
959966
err := server.Serve(ctx, serverSession)
960967
if err != nil && !xerrors.Is(err, io.EOF) {

coderd/database/dbauthz/dbauthz.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ var (
144144
},
145145
}),
146146
Scope: rbac.ScopeAll,
147-
}
147+
}.WithCachedASTValue()
148+
148149
subjectAutostart = rbac.Subject{
149150
ID: uuid.Nil.String(),
150151
Roles: rbac.Roles([]rbac.Role{
@@ -161,7 +162,8 @@ var (
161162
},
162163
}),
163164
Scope: rbac.ScopeAll,
164-
}
165+
}.WithCachedASTValue()
166+
165167
subjectSystemRestricted = rbac.Subject{
166168
ID: uuid.Nil.String(),
167169
Roles: rbac.Roles([]rbac.Role{
@@ -188,7 +190,7 @@ var (
188190
},
189191
}),
190192
Scope: rbac.ScopeAll,
191-
}
193+
}.WithCachedASTValue()
192194
)
193195

194196
// AsProvisionerd returns a context with an actor that has permissions required

coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE provisioner_jobs DROP COLUMN trace_metadata;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE provisioner_jobs ADD COLUMN trace_metadata jsonb;

coderd/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/pubsub.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ type Pubsub interface {
2525

2626
// Pubsub implementation using PostgreSQL.
2727
type pgPubsub struct {
28+
ctx context.Context
2829
pgListener *pq.Listener
2930
db *sql.DB
3031
mut sync.Mutex
31-
listeners map[string]map[uuid.UUID]Listener
32+
listeners map[string]map[uuid.UUID]chan<- []byte
3233
}
3334

35+
// messageBufferSize is the maximum number of unhandled messages we will buffer
36+
// for a subscriber before dropping messages.
37+
const messageBufferSize = 2048
38+
3439
// Subscribe calls the listener when an event matching the name is received.
3540
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
3641
p.mut.Lock()
@@ -45,25 +50,22 @@ func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), er
4550
return nil, xerrors.Errorf("listen: %w", err)
4651
}
4752

48-
var eventListeners map[uuid.UUID]Listener
53+
var eventListeners map[uuid.UUID]chan<- []byte
4954
var ok bool
5055
if eventListeners, ok = p.listeners[event]; !ok {
51-
eventListeners = map[uuid.UUID]Listener{}
56+
eventListeners = make(map[uuid.UUID]chan<- []byte)
5257
p.listeners[event] = eventListeners
5358
}
5459

55-
var id uuid.UUID
56-
for {
57-
id = uuid.New()
58-
if _, ok = eventListeners[id]; !ok {
59-
break
60-
}
61-
}
62-
63-
eventListeners[id] = listener
60+
ctx, cancelCallbacks := context.WithCancel(p.ctx)
61+
messages := make(chan []byte, messageBufferSize)
62+
go messagesToListener(ctx, messages, listener)
63+
id := uuid.New()
64+
eventListeners[id] = messages
6465
return func() {
6566
p.mut.Lock()
6667
defer p.mut.Unlock()
68+
cancelCallbacks()
6769
listeners := p.listeners[event]
6870
delete(listeners, id)
6971

@@ -109,11 +111,11 @@ func (p *pgPubsub) listen(ctx context.Context) {
109111
if notif == nil {
110112
continue
111113
}
112-
p.listenReceive(ctx, notif)
114+
p.listenReceive(notif)
113115
}
114116
}
115117

116-
func (p *pgPubsub) listenReceive(ctx context.Context, notif *pq.Notification) {
118+
func (p *pgPubsub) listenReceive(notif *pq.Notification) {
117119
p.mut.Lock()
118120
defer p.mut.Unlock()
119121
listeners, ok := p.listeners[notif.Channel]
@@ -122,7 +124,14 @@ func (p *pgPubsub) listenReceive(ctx context.Context, notif *pq.Notification) {
122124
}
123125
extra := []byte(notif.Extra)
124126
for _, listener := range listeners {
125-
go listener(ctx, extra)
127+
select {
128+
case listener <- extra:
129+
// ok!
130+
default:
131+
// bad news, we dropped the event because the listener isn't
132+
// keeping up
133+
// TODO (spike): figure out a way to communicate this to the Listener
134+
}
126135
}
127136
}
128137

@@ -150,11 +159,23 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
150159
return nil, ctx.Err()
151160
}
152161
pgPubsub := &pgPubsub{
162+
ctx: ctx,
153163
db: database,
154164
pgListener: listener,
155-
listeners: make(map[string]map[uuid.UUID]Listener),
165+
listeners: make(map[string]map[uuid.UUID]chan<- []byte),
156166
}
157167
go pgPubsub.listen(ctx)
158168

159169
return pgPubsub, nil
160170
}
171+
172+
func messagesToListener(ctx context.Context, messages <-chan []byte, listener Listener) {
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return
177+
case m := <-messages:
178+
listener(ctx, m)
179+
}
180+
}
181+
}

coderd/database/pubsub_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ package database_test
55
import (
66
"context"
77
"database/sql"
8+
"fmt"
9+
"math/rand"
810
"testing"
11+
"time"
12+
13+
"github.com/coder/coder/testutil"
914

1015
"github.com/stretchr/testify/assert"
1116
"github.com/stretchr/testify/require"
@@ -67,3 +72,44 @@ func TestPubsub(t *testing.T) {
6772
cancelFunc()
6873
})
6974
}
75+
76+
func TestPubsub_ordering(t *testing.T) {
77+
t.Parallel()
78+
79+
ctx, cancelFunc := context.WithCancel(context.Background())
80+
defer cancelFunc()
81+
82+
connectionURL, closePg, err := postgres.Open()
83+
require.NoError(t, err)
84+
defer closePg()
85+
db, err := sql.Open("postgres", connectionURL)
86+
require.NoError(t, err)
87+
defer db.Close()
88+
pubsub, err := database.NewPubsub(ctx, db, connectionURL)
89+
require.NoError(t, err)
90+
defer pubsub.Close()
91+
event := "test"
92+
messageChannel := make(chan []byte, 100)
93+
cancelFunc, err = pubsub.Subscribe(event, func(ctx context.Context, message []byte) {
94+
// sleep a random amount of time to simulate handlers taking different amount of time
95+
// to process, depending on the message
96+
// nolint: gosec
97+
n := rand.Intn(100)
98+
time.Sleep(time.Duration(n) * time.Millisecond)
99+
messageChannel <- message
100+
})
101+
require.NoError(t, err)
102+
defer cancelFunc()
103+
for i := 0; i < 100; i++ {
104+
err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", i)))
105+
assert.NoError(t, err)
106+
}
107+
for i := 0; i < 100; i++ {
108+
select {
109+
case <-time.After(testutil.WaitShort):
110+
t.Fatalf("timed out waiting for message %d", i)
111+
case message := <-messageChannel:
112+
assert.Equal(t, fmt.Sprintf("%d", i), string(message))
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)