Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tests
  • Loading branch information
deansheather authored and ethanndickson committed Jun 28, 2024
commit 479df1fd1255ffb63a1ed70c4e50e5dc119e76ee
2 changes: 2 additions & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/clock"
_ "github.com/coder/coder/v2/coderd/apidoc" // Used for swagger docs.
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/audit"
Expand Down Expand Up @@ -548,6 +549,7 @@ func New(options *Options) *API {
options.PrometheusRegistry.MustRegister(stn)
}
api.NetworkTelemetryBatcher = tailnet.NewNetworkTelemetryBatcher(
clock.NewReal(),
api.Options.NetworkTelemetryBatchFrequency,
api.Options.NetworkTelemetryBatchMaxSize,
api.handleNetworkTelemetry,
Expand Down
9 changes: 6 additions & 3 deletions tailnet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/v2/apiversion"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/tailnet/proto"
)

Expand Down Expand Up @@ -239,19 +240,21 @@ func (c communicator) loopResp() {
}

type NetworkTelemetryBatcher struct {
clock clock.Clock
frequency time.Duration
maxSize int
batchFn func(batch []*proto.TelemetryEvent)

mu sync.Mutex
closed chan struct{}
done chan struct{}
ticker *time.Ticker
ticker *clock.Ticker
pending []*proto.TelemetryEvent
}

func NewNetworkTelemetryBatcher(frequency time.Duration, maxSize int, batchFn func(batch []*proto.TelemetryEvent)) *NetworkTelemetryBatcher {
func NewNetworkTelemetryBatcher(clk clock.Clock, frequency time.Duration, maxSize int, batchFn func(batch []*proto.TelemetryEvent)) *NetworkTelemetryBatcher {
b := &NetworkTelemetryBatcher{
clock: clk,
frequency: frequency,
maxSize: maxSize,
batchFn: batchFn,
Expand Down Expand Up @@ -279,7 +282,7 @@ func (b *NetworkTelemetryBatcher) sendTelemetryBatch() {
func (b *NetworkTelemetryBatcher) start() {
b.mu.Lock()
defer b.mu.Unlock()
ticker := time.NewTicker(b.frequency)
ticker := b.clock.NewTicker(b.frequency)
b.ticker = ticker

go func() {
Expand Down
82 changes: 76 additions & 6 deletions tailnet/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
Expand All @@ -28,12 +30,16 @@ func TestClientService_ServeClient_V2(t *testing.T) {
coordPtr.Store(&coord)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
derpMap := &tailcfg.DERPMap{Regions: map[int]*tailcfg.DERPRegion{999: {RegionCode: "test"}}}

telemetryEvents := make(chan []*proto.TelemetryEvent, 64)
uut, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Millisecond,
DERPMapFn: func() *tailcfg.DERPMap { return derpMap },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {},
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Millisecond,
DERPMapFn: func() *tailcfg.DERPMap { return derpMap },
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {
telemetryEvents <- batch
},
})
require.NoError(t, err)

Expand Down Expand Up @@ -100,7 +106,23 @@ func TestClientService_ServeClient_V2(t *testing.T) {
require.NoError(t, err)

// PostTelemetry
// TODO: write test
telemetryReq := &proto.TelemetryRequest{
Events: []*proto.TelemetryEvent{
{
Id: []byte("hi"),
},
{
Id: []byte("bye"),
},
},
}
res, err := client.PostTelemetry(ctx, telemetryReq)
require.NoError(t, err)
require.NotNil(t, res)
gotEvents := testutil.RequireRecvCtx(ctx, t, telemetryEvents)
require.Len(t, gotEvents, 2)
require.Equal(t, "hi", string(gotEvents[0].Id))
require.Equal(t, "bye", string(gotEvents[1].Id))

// RPCs closed; we need to close the Conn to end the session.
err = c.Close()
Expand Down Expand Up @@ -154,3 +176,51 @@ func TestClientService_ServeClient_V1(t *testing.T) {
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.ErrorIs(t, err, expectedError)
}

func TestNetworkTelemetryBatcher(t *testing.T) {
t.Parallel()

var (
events = make(chan []*proto.TelemetryEvent, 64)
mClock = clock.NewMock(t)
b = tailnet.NewNetworkTelemetryBatcher(mClock, time.Millisecond, 3, func(batch []*proto.TelemetryEvent) {
assert.LessOrEqual(t, len(batch), 3)
events <- batch
})
)

b.Handler([]*proto.TelemetryEvent{
{Id: []byte("1")},
{Id: []byte("2")},
})
b.Handler([]*proto.TelemetryEvent{
{Id: []byte("3")},
{Id: []byte("4")},
})

// Should overflow and send a batch.
ctx := testutil.Context(t, testutil.WaitShort)
batch := testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 3)
require.Equal(t, "1", string(batch[0].Id))
require.Equal(t, "2", string(batch[1].Id))
require.Equal(t, "3", string(batch[2].Id))

// Should send any pending events when the ticker fires.
mClock.Advance(time.Millisecond)
batch = testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 1)
require.Equal(t, "4", string(batch[0].Id))

// Should send any pending events when closed.
b.Handler([]*proto.TelemetryEvent{
{Id: []byte("5")},
{Id: []byte("6")},
})
err := b.Close()
require.NoError(t, err)
batch = testutil.RequireRecvCtx(ctx, t, events)
require.Len(t, batch, 2)
require.Equal(t, "5", string(batch[0].Id))
require.Equal(t, "6", string(batch[1].Id))
}