Skip to content

Commit 620c148

Browse files
committed
Avoid slim database import
1 parent fe7e682 commit 620c148

File tree

4 files changed

+39
-25
lines changed

4 files changed

+39
-25
lines changed

coderd/agentapi/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/coder/coder/v2/coderd/database/pubsub"
2323
"github.com/coder/coder/v2/coderd/externalauth"
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
25-
"github.com/coder/coder/v2/coderd/telemetry"
2625
"github.com/coder/coder/v2/coderd/tracing"
2726
"github.com/coder/coder/v2/coderd/workspacestats"
2827
"github.com/coder/coder/v2/codersdk"
@@ -65,7 +64,7 @@ type Options struct {
6564
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
6665
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
6766
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
68-
NetworkTelemetryBatchFn func(batch []telemetry.NetworkEvent)
67+
NetworkTelemetryBatchFn func(batch []*tailnetproto.TelemetryEvent)
6968

7069
AccessURL *url.URL
7170
AppHostname string

coderd/agentapi/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import (
2020
"github.com/coder/coder/v2/coderd/externalauth"
2121
"github.com/coder/coder/v2/coderd/prometheusmetrics"
2222
"github.com/coder/coder/v2/coderd/schedule"
23-
"github.com/coder/coder/v2/coderd/telemetry"
2423
"github.com/coder/coder/v2/coderd/workspacestats"
2524
"github.com/coder/coder/v2/codersdk"
2625
"github.com/coder/coder/v2/tailnet"
26+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
2727
"github.com/coder/coder/v2/tailnet/tailnettest"
2828
"github.com/coder/coder/v2/testutil"
2929
)
@@ -83,7 +83,7 @@ func Test_APIClose(t *testing.T) {
8383
StatsReporter: statsReporter,
8484
AppearanceFetcher: &appearanceFetcherPtr,
8585
PublishWorkspaceUpdateFn: func(_ context.Context, _ uuid.UUID) {},
86-
NetworkTelemetryBatchFn: func(_ []telemetry.NetworkEvent) {},
86+
NetworkTelemetryBatchFn: func(_ []*tailnetproto.TelemetryEvent) {},
8787
AccessURL: &url.URL{
8888
Scheme: "http",
8989
Host: "localhost",

coderd/workspaceagentsrpc.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/coder/coder/v2/coderd/util/ptr"
2929
"github.com/coder/coder/v2/codersdk"
3030
"github.com/coder/coder/v2/tailnet"
31+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
3132
)
3233

3334
// @Summary Workspace agent RPC API
@@ -135,9 +136,19 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
135136
StatsReporter: api.statsReporter,
136137
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
137138
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
138-
NetworkTelemetryBatchFn: func(batch []telemetry.NetworkEvent) {
139+
NetworkTelemetryBatchFn: func(batch []*tailnetproto.TelemetryEvent) {
140+
telemetryEvents := make([]telemetry.NetworkEvent, 0, len(batch))
141+
for _, pEvent := range batch {
142+
tEvent, err := telemetry.NetworkEventFromProto(pEvent)
143+
if err != nil {
144+
// Events that fail to be converted get discarded for now.
145+
continue
146+
}
147+
telemetryEvents = append(telemetryEvents, tEvent)
148+
}
149+
139150
api.Telemetry.Report(&telemetry.Snapshot{
140-
NetworkEvents: batch,
151+
NetworkEvents: telemetryEvents,
141152
})
142153
},
143154

tailnet/service.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,14 @@ import (
1010

1111
"github.com/google/uuid"
1212
"github.com/hashicorp/yamux"
13+
"golang.org/x/xerrors"
1314
"storj.io/drpc/drpcmux"
1415
"storj.io/drpc/drpcserver"
1516
"tailscale.com/tailcfg"
1617

1718
"cdr.dev/slog"
1819
"github.com/coder/coder/v2/apiversion"
19-
"github.com/coder/coder/v2/coderd/telemetry"
2020
"github.com/coder/coder/v2/tailnet/proto"
21-
22-
"golang.org/x/xerrors"
2321
)
2422

2523
type streamIDContextKey struct{}
@@ -125,13 +123,14 @@ type DRPCService struct {
125123
DerpMapFn func() *tailcfg.DERPMap
126124
NetworkTelemetryBatchFrequency time.Duration
127125
NetworkTelemetryBatchMaxSize int
128-
NetworkTelemetryBatchFn func(batch []telemetry.NetworkEvent)
126+
NetworkTelemetryBatchFn func(batch []*proto.TelemetryEvent)
129127

130-
mu sync.Mutex
131-
pendingNetworkEvents []telemetry.NetworkEvent
128+
mu sync.Mutex
129+
networkEventBatchTicker *time.Ticker
130+
pendingNetworkEvents []*proto.TelemetryEvent
132131
}
133132

134-
func (s *DRPCService) writeTelemetryEvents(events []telemetry.NetworkEvent) {
133+
func (s *DRPCService) writeTelemetryEvents(events []*proto.TelemetryEvent) {
135134
if s.NetworkTelemetryBatchFn == nil {
136135
return
137136
}
@@ -142,13 +141,14 @@ func (s *DRPCService) sendTelemetryBatch() {
142141
s.mu.Lock()
143142
defer s.mu.Unlock()
144143
events := s.pendingNetworkEvents
145-
s.pendingNetworkEvents = []telemetry.NetworkEvent{}
144+
s.pendingNetworkEvents = []*proto.TelemetryEvent{}
146145
go s.writeTelemetryEvents(events)
147146
}
148147

149148
// PeriodicTelemetryBatcher starts a goroutine to periodically send telemetry
150149
// events to the telemetry backend. The returned function is a cleanup function
151-
// that should be called when the service is no longer needed.
150+
// that should be called when the service is no longer needed. Calling this more
151+
// than once will panic.
152152
//
153153
// Note: calling the returned function does not unblock any in-flight calls to
154154
// the underlying telemetry backend that come from PostTelemetry due to
@@ -162,9 +162,16 @@ func (s *DRPCService) PeriodicTelemetryBatcher() func() {
162162
return func() {}
163163
}
164164

165+
s.mu.Lock()
166+
defer s.mu.Unlock()
167+
if s.networkEventBatchTicker != nil {
168+
panic("PeriodicTelemetryBatcher called more than once")
169+
}
170+
ticker := time.NewTicker(s.NetworkTelemetryBatchFrequency)
171+
s.networkEventBatchTicker = ticker
172+
165173
go func() {
166174
defer close(done)
167-
ticker := time.NewTicker(s.NetworkTelemetryBatchFrequency)
168175
defer ticker.Stop()
169176

170177
for {
@@ -189,19 +196,16 @@ func (s *DRPCService) PostTelemetry(_ context.Context, req *proto.TelemetryReque
189196
s.mu.Lock()
190197
defer s.mu.Unlock()
191198

192-
for _, pEvent := range req.Events {
193-
tEvent, err := telemetry.NetworkEventFromProto(pEvent)
194-
if err != nil {
195-
// TODO(@deansheather): log? return an error?
196-
continue
197-
}
198-
199-
s.pendingNetworkEvents = append(s.pendingNetworkEvents, tEvent)
199+
for _, event := range req.Events {
200+
s.pendingNetworkEvents = append(s.pendingNetworkEvents, event)
200201

201202
if len(s.pendingNetworkEvents) >= s.NetworkTelemetryBatchMaxSize {
202203
events := s.pendingNetworkEvents
203-
s.pendingNetworkEvents = []telemetry.NetworkEvent{}
204+
s.pendingNetworkEvents = []*proto.TelemetryEvent{}
204205
// Perform the send in a goroutine to avoid blocking the DRPC call.
206+
if s.networkEventBatchTicker != nil {
207+
s.networkEventBatchTicker.Reset(s.NetworkTelemetryBatchFrequency)
208+
}
205209
go s.writeTelemetryEvents(events)
206210
}
207211
}

0 commit comments

Comments
 (0)