Skip to content

chore: tailnet debug logging #7260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add logging to coordinator
Signed-off-by: Spike Curtis <spike@coder.com>
  • Loading branch information
spikecurtis committed Apr 26, 2023
commit 1b9f3285f429128672de815d4c57eb5b7028394e
36 changes: 20 additions & 16 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func TestAgent_UnixRemoteForwarding(t *testing.T) {
var err error
conn, err = net.Dial("unix", remoteSocketPath)
return err == nil
}, testutil.WaitLong, testutil.IntervalFast)
}, testutil.WaitShort, testutil.IntervalFast)
defer conn.Close()
_, err = conn.Write([]byte("test"))
require.NoError(t, err)
Expand Down Expand Up @@ -879,6 +879,7 @@ func TestAgent_StartupScript(t *testing.T) {
}
t.Run("Success", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
client := &client{
t: t,
agentID: uuid.New(),
Expand All @@ -887,12 +888,12 @@ func TestAgent_StartupScript(t *testing.T) {
DERPMap: &tailcfg.DERPMap{},
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(),
coordinator: tailnet.NewCoordinator(logger),
}
closer := agent.New(agent.Options{
Client: client,
Filesystem: afero.NewMemMapFs(),
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
Logger: logger.Named("agent"),
ReconnectingPTYTimeout: 0,
})
t.Cleanup(func() {
Expand All @@ -910,6 +911,7 @@ func TestAgent_StartupScript(t *testing.T) {
// script has written too many lines it will still succeed!
t.Run("OverflowsAndSkips", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
client := &client{
t: t,
agentID: uuid.New(),
Expand All @@ -927,12 +929,12 @@ func TestAgent_StartupScript(t *testing.T) {
return codersdk.ReadBodyAsError(res)
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(),
coordinator: tailnet.NewCoordinator(logger),
}
closer := agent.New(agent.Options{
Client: client,
Filesystem: afero.NewMemMapFs(),
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
Logger: logger.Named("agent"),
ReconnectingPTYTimeout: 0,
})
t.Cleanup(func() {
Expand Down Expand Up @@ -1282,7 +1284,7 @@ func TestAgent_Lifecycle(t *testing.T) {

t.Run("ShutdownScriptOnce", func(t *testing.T) {
t.Parallel()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
expected := "this-is-shutdown"
client := &client{
t: t,
Expand All @@ -1293,13 +1295,13 @@ func TestAgent_Lifecycle(t *testing.T) {
ShutdownScript: "echo " + expected,
},
statsChan: make(chan *agentsdk.Stats),
coordinator: tailnet.NewCoordinator(),
coordinator: tailnet.NewCoordinator(logger),
}

fs := afero.NewMemMapFs()
agent := agent.New(agent.Options{
Client: client,
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
Logger: logger.Named("agent"),
Filesystem: fs,
})

Expand Down Expand Up @@ -1548,9 +1550,10 @@ func TestAgent_Speedtest(t *testing.T) {

func TestAgent_Reconnect(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
// After the agent is disconnected from a coordinator, it's supposed
// to reconnect!
coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewCoordinator(logger)
defer coordinator.Close()

agentID := uuid.New()
Expand All @@ -1572,7 +1575,7 @@ func TestAgent_Reconnect(t *testing.T) {
return "", nil
},
Client: client,
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
Logger: logger.Named("agent"),
})
defer closer.Close()

Expand All @@ -1587,8 +1590,8 @@ func TestAgent_Reconnect(t *testing.T) {

func TestAgent_WriteVSCodeConfigs(t *testing.T) {
t.Parallel()

coordinator := tailnet.NewCoordinator()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinator(logger)
defer coordinator.Close()

client := &client{
Expand All @@ -1607,7 +1610,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
return "", nil
},
Client: client,
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
Logger: logger.Named("agent"),
Filesystem: filesystem,
})
defer closer.Close()
Expand Down Expand Up @@ -1698,10 +1701,11 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
afero.Fs,
io.Closer,
) {
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
if metadata.DERPMap == nil {
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
}
coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewCoordinator(logger)
t.Cleanup(func() {
_ = coordinator.Close()
})
Expand All @@ -1718,7 +1722,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
closer := agent.New(agent.Options{
Client: c,
Filesystem: fs,
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
Logger: logger.Named("agent"),
ReconnectingPTYTimeout: ptyTimeout,
})
t.Cleanup(func() {
Expand All @@ -1727,7 +1731,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
DERPMap: metadata.DERPMap,
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
Logger: logger.Named("client"),
})
require.NoError(t, err)
clientConn, serverConn := net.Pipe()
Expand Down
2 changes: 1 addition & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func New(options *Options) *API {
options.PrometheusRegistry = prometheus.NewRegistry()
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator()
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
}
if options.DERPServer == nil {
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp")))
Expand Down
3 changes: 2 additions & 1 deletion coderd/prometheusmetrics/prometheusmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"

"github.com/coder/coder/coderd/coderdtest"
Expand Down Expand Up @@ -298,7 +299,7 @@ func TestAgents(t *testing.T) {
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)

// given
coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug))
coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{}
coordinatorPtr.Store(&coordinator)
derpMap := tailnettest.RunDERPAndSTUN(t)
Expand Down
6 changes: 3 additions & 3 deletions coderd/wsconncache/wsconncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ func TestCache(t *testing.T) {

func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Duration) *codersdk.WorkspaceAgentConn {
t.Helper()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
manifest.DERPMap = tailnettest.RunDERPAndSTUN(t)

coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewCoordinator(logger)
t.Cleanup(func() {
_ = coordinator.Close()
})
Expand All @@ -171,7 +171,7 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
manifest: manifest,
coordinator: coordinator,
},
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelInfo),
Logger: logger.Named("agent"),
ReconnectingPTYTimeout: ptyTimeout,
})
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (api *API) updateEntitlements(ctx context.Context) error {
}

if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
coordinator := agpltailnet.NewCoordinator()
coordinator := agpltailnet.NewCoordinator(api.Logger)
if enabled {
haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub)
if err != nil {
Expand Down
32 changes: 29 additions & 3 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync/atomic"
"time"

"cdr.dev/slog"

"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
}, errChan
}

const LoggerName = "coord"

// NewCoordinator constructs a new in-memory connection coordinator. This
// coordinator is incompatible with multiple Coder replicas as all node data is
// in-memory.
func NewCoordinator() Coordinator {
func NewCoordinator(logger slog.Logger) Coordinator {
nameCache, err := lru.New[uuid.UUID, string](512)
if err != nil {
panic("make lru cache: " + err.Error())
}

return &coordinator{
logger: logger.Named(LoggerName),
closed: false,
nodes: map[uuid.UUID]*Node{},
agentSockets: map[uuid.UUID]*TrackedConn{},
Expand All @@ -137,6 +142,7 @@ func NewCoordinator() Coordinator {
// This coordinator is incompatible with multiple Coder
// replicas as all node data is in-memory.
type coordinator struct {
logger slog.Logger
mutex sync.RWMutex
closed bool

Expand Down Expand Up @@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int {
// ServeClient accepts a WebSocket connection that wants to connect to an agent
// with the specified ID.
func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
logger.Debug(context.TODO(), "coordinating client")
c.mutex.Lock()
if c.closed {
c.mutex.Unlock()
Expand All @@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
return xerrors.Errorf("marshal node: %w", err)
}
_, err = conn.Write(data)
logger.Debug(context.TODO(), "wrote initial node")
if err != nil {
return xerrors.Errorf("write nodes: %w", err)
}
Expand All @@ -230,7 +239,9 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
LastWrite: now,
}
c.mutex.Unlock()
logger.Debug(context.TODO(), "added tracked connection")
defer func() {
logger.Debug(context.TODO(), "deleting tracked connection")
c.mutex.Lock()
defer c.mutex.Unlock()
// Clean all traces of this connection from the map.
Expand Down Expand Up @@ -259,11 +270,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
}

func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error {
logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent))
var node Node
err := decoder.Decode(&node)
if err != nil {
return xerrors.Errorf("read json: %w", err)
}
logger.Debug(context.TODO(), "got client node update", slog.F("node", node))

c.mutex.Lock()
// Update the node of this client in our in-memory map. If an agent entirely
Expand All @@ -274,6 +287,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
agentSocket, ok := c.agentSockets[agent]
if !ok {
c.mutex.Unlock()
logger.Debug(context.TODO(), "no agent socket")
return nil
}
c.mutex.Unlock()
Expand All @@ -291,13 +305,16 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
}
return xerrors.Errorf("write json: %w", err)
}
logger.Debug(context.TODO(), "sent client node to agent")

return nil
}

// ServeAgent accepts a WebSocket connection to an agent that
// listens to incoming connections and publishes node updates.
func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
logger := c.logger.With(slog.F("agent_id", id))
logger.Debug(context.TODO(), "coordinating agent")
c.mutex.Lock()
if c.closed {
c.mutex.Unlock()
Expand All @@ -324,6 +341,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
return xerrors.Errorf("marshal json: %w", err)
}
_, err = conn.Write(data)
logger.Debug(context.TODO(), "wrote initial client(s) to agent", slog.F("nodes", nodes))
if err != nil {
return xerrors.Errorf("write nodes: %w", err)
}
Expand Down Expand Up @@ -356,6 +374,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
}

c.mutex.Unlock()
logger.Debug(context.TODO(), "added agent socket")
defer func() {
c.mutex.Lock()
defer c.mutex.Unlock()
Expand All @@ -365,6 +384,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
delete(c.agentSockets, id)
delete(c.nodes, id)
logger.Debug(context.TODO(), "deleted agent socket")
}
}()

Expand All @@ -381,17 +401,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error
}

func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error {
logger := c.logger.With(slog.F("agent_id", id))
var node Node
err := decoder.Decode(&node)
if err != nil {
return xerrors.Errorf("read json: %w", err)
}
logger.Debug(context.TODO(), "decoded agent node", slog.F("node", node))

c.mutex.Lock()
c.nodes[id] = &node
connectionSockets, ok := c.agentToConnectionSockets[id]
if !ok {
c.mutex.Unlock()
logger.Debug(context.TODO(), "no client sockets")
return nil
}
data, err := json.Marshal([]*Node{&node})
Expand All @@ -403,11 +426,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
// Publish the new node to every listening socket.
var wg sync.WaitGroup
wg.Add(len(connectionSockets))
for _, connectionSocket := range connectionSockets {
for clientID, connectionSocket := range connectionSockets {
clientID := clientID
connectionSocket := connectionSocket
go func() {
_ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, _ = connectionSocket.Write(data)
_, err := connectionSocket.Write(data)
logger.Debug(context.TODO(), "sent agent node to client",
slog.F("client_id", clientID), slog.Error(err))
wg.Done()
}()
}
Expand Down
Loading