Skip to content

fix: use explicit api versions for agent and tailnet #15508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: use explicit api versions for agent and tailnet
  • Loading branch information
spikecurtis committed Nov 15, 2024
commit 46718e60243c44d738b2bc06b4c394744118ebaf
6 changes: 6 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# These APIs are versioned, so any changes need to be carefully reviewed for whether
# to bump API major or minor versions.
agent/proto/ @spikecurtis @johnstcn
tailnet/proto/ @spikecurtis @johnstcn
vpn/vpn.proto @spikecurtis @johnstcn
vpn/version.go @spikecurtis @johnstcn
141 changes: 91 additions & 50 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"storj.io/drpc"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg"
"tailscale.com/types/netlogtype"
Expand Down Expand Up @@ -94,7 +93,9 @@ type Options struct {
}

type Client interface {
ConnectRPC(ctx context.Context) (drpc.Conn, error)
ConnectRPC23(ctx context.Context) (
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
)
RewriteDERPMap(derpMap *tailcfg.DERPMap)
}

Expand Down Expand Up @@ -410,7 +411,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
fn()
}

func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
tickerDone := make(chan struct{})
collectDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -572,7 +573,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
reportTimeout = 30 * time.Second
reportError = make(chan error, 1)
reportInFlight = false
aAPI = proto.NewDRPCAgentClient(conn)
)

for {
Expand Down Expand Up @@ -627,8 +627,7 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {

// reportLifecycle reports the current lifecycle state once. All state
// changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
for {
select {
case <-a.lifecycleUpdate:
Expand Down Expand Up @@ -710,8 +709,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
// fetchServiceBannerLoop fetches the service banner on an interval. It will
// not be fetched immediately; the expectation is that it is primed elsewhere
// (and must be done before the session actually starts).
func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
defer ticker.Stop()
for {
Expand All @@ -737,7 +735,7 @@ func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) erro
}

func (a *agent) run() (retErr error) {
// This allows the agent to refresh it's token if necessary.
// This allows the agent to refresh its token if necessary.
// For instance identity this is required, since the instance
// may not have re-provisioned, but a new agent ID was created.
sessionToken, err := a.exchangeToken(a.hardCtx)
Expand All @@ -747,12 +745,12 @@ func (a *agent) run() (retErr error) {
a.sessionToken.Store(&sessionToken)

// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
conn, err := a.client.ConnectRPC(a.hardCtx)
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx)
if err != nil {
return err
}
defer func() {
cErr := conn.Close()
cErr := aAPI.DRPCConn().Close()
if cErr != nil {
a.logger.Debug(a.hardCtx, "error closing drpc connection", slog.Error(err))
}
Expand All @@ -761,11 +759,10 @@ func (a *agent) run() (retErr error) {
// A lot of routines need the agent API / tailnet API connection. We run them in their own
// goroutines in parallel, but errors in any routine will cause them all to exit so we can
// redial the coder server and retry.
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, conn)
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)

connMan.start("init notification banners", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
if err != nil {
return xerrors.Errorf("fetch service banner: %w", err)
Expand All @@ -781,9 +778,9 @@ func (a *agent) run() (retErr error) {

// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
// shutdown scripts.
connMan.start("send logs", gracefulShutdownBehaviorRemain,
func(ctx context.Context, conn drpc.Conn) error {
err := a.logSender.SendLoop(ctx, proto.NewDRPCAgentClient(conn))
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
err := a.logSender.SendLoop(ctx, aAPI)
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
// we don't want this error to tear down the API connection and propagate to the
// other routines that use the API. The LogSender has already dropped a warning
Expand All @@ -795,10 +792,10 @@ func (a *agent) run() (retErr error) {

// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)
connMan.startAgentAPI("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)

// metadata reporting can cease as soon as we start gracefully shutting down
connMan.start("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)

// channels to sync goroutines below
// handle manifest
Expand All @@ -819,55 +816,55 @@ func (a *agent) run() (retErr error) {
networkOK := newCheckpoint(a.logger)
manifestOK := newCheckpoint(a.logger)

connMan.start("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))

connMan.start("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
manifest := a.manifest.Load()
NewWorkspaceAppHealthReporter(
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
a.logger, manifest.Apps, agentsdk.AppHealthPoster(aAPI),
)(ctx)
return nil
})

connMan.start("create or update network", gracefulShutdownBehaviorStop,
connMan.startAgentAPI("create or update network", gracefulShutdownBehaviorStop,
a.createOrUpdateNetwork(manifestOK, networkOK))

connMan.start("coordination", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.runCoordinator(ctx, conn, a.network)
return a.runCoordinator(ctx, tAPI, a.network)
},
)

connMan.start("derp map subscriber", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.runDERPMapSubscriber(ctx, conn, a.network)
return a.runDERPMapSubscriber(ctx, tAPI, a.network)
})

connMan.start("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)

connMan.start("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, conn drpc.Conn) error {
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.statsReporter.reportLoop(ctx, proto.NewDRPCAgentClient(conn))
return a.statsReporter.reportLoop(ctx, aAPI)
})

return connMan.wait()
}

// handleManifest returns a function that fetches and processes the manifest
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, conn drpc.Conn) error {
return func(ctx context.Context, conn drpc.Conn) error {
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
var (
sentResult = false
err error
Expand All @@ -877,7 +874,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
manifestOK.complete(err)
}
}()
aAPI := proto.NewDRPCAgentClient(conn)
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
if err != nil {
return xerrors.Errorf("fetch metadata: %w", err)
Expand Down Expand Up @@ -977,8 +973,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,

// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
// the tailnet using the information in the manifest
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, drpc.Conn) error {
return func(ctx context.Context, _ drpc.Conn) (retErr error) {
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error {
return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
Expand Down Expand Up @@ -1325,9 +1321,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t

// runCoordinator runs a coordinator and returns whether a reconnect
// should occur.
func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from coordination RPC")
tClient := tailnetproto.NewDRPCTailnetClient(conn)
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
// gracefully shut down.
coordinate, err := tClient.Coordinate(a.hardCtx)
Expand Down Expand Up @@ -1373,11 +1368,10 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
}

// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from derp map RPC")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tClient := tailnetproto.NewDRPCTailnetClient(conn)
stream, err := tClient.StreamDERPMaps(ctx, &tailnetproto.StreamDERPMapsRequest{})
if err != nil {
return xerrors.Errorf("stream DERP Maps: %w", err)
Expand Down Expand Up @@ -1981,13 +1975,17 @@ const (

type apiConnRoutineManager struct {
logger slog.Logger
conn drpc.Conn
aAPI proto.DRPCAgentClient23
tAPI tailnetproto.DRPCTailnetClient23
eg *errgroup.Group
stopCtx context.Context
remainCtx context.Context
}

func newAPIConnRoutineManager(gracefulCtx, hardCtx context.Context, logger slog.Logger, conn drpc.Conn) *apiConnRoutineManager {
func newAPIConnRoutineManager(
gracefulCtx, hardCtx context.Context, logger slog.Logger,
aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23,
) *apiConnRoutineManager {
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
// exit if the errgroup hits an error, which usually means a problem with the conn.
eg, remainCtx := errgroup.WithContext(hardCtx)
Expand All @@ -2007,17 +2005,60 @@ func newAPIConnRoutineManager(gracefulCtx, hardCtx context.Context, logger slog.
stopCtx := eitherContext(remainCtx, gracefulCtx)
return &apiConnRoutineManager{
logger: logger,
conn: conn,
aAPI: aAPI,
tAPI: tAPI,
eg: eg,
stopCtx: stopCtx,
remainCtx: remainCtx,
}
}

func (a *apiConnRoutineManager) start(name string, b gracefulShutdownBehavior, f func(context.Context, drpc.Conn) error) {
// startAgentAPI starts a routine that uses the Agent API. c.f. startTailnetAPI which is the same
// but for Tailnet.
func (a *apiConnRoutineManager) startAgentAPI(
name string, behavior gracefulShutdownBehavior,
f func(context.Context, proto.DRPCAgentClient23) error,
) {
logger := a.logger.With(slog.F("name", name))
var ctx context.Context
switch behavior {
case gracefulShutdownBehaviorStop:
ctx = a.stopCtx
case gracefulShutdownBehaviorRemain:
ctx = a.remainCtx
default:
panic("unknown behavior")
}
a.eg.Go(func() error {
logger.Debug(ctx, "starting agent routine")
err := f(ctx, a.aAPI)
if xerrors.Is(err, context.Canceled) && ctx.Err() != nil {
logger.Debug(ctx, "swallowing context canceled")
// Don't propagate context canceled errors to the error group, because we don't want the
// graceful context being canceled to halt the work of routines with
// gracefulShutdownBehaviorRemain. Note that we check both that the error is
// context.Canceled and that *our* context is currently canceled, because when Coderd
// unilaterally closes the API connection (for example if the build is outdated), it can
// sometimes show up as context.Canceled in our RPC calls.
return nil
}
logger.Debug(ctx, "routine exited", slog.Error(err))
if err != nil {
return xerrors.Errorf("error in routine %s: %w", name, err)
}
return nil
})
}

// startTailnetAPI starts a routine that uses the Tailnet API. c.f. startAgentAPI which is the same
// but for the Agent API.
func (a *apiConnRoutineManager) startTailnetAPI(
name string, behavior gracefulShutdownBehavior,
f func(context.Context, tailnetproto.DRPCTailnetClient23) error,
) {
logger := a.logger.With(slog.F("name", name))
var ctx context.Context
switch b {
switch behavior {
case gracefulShutdownBehaviorStop:
ctx = a.stopCtx
case gracefulShutdownBehaviorRemain:
Expand All @@ -2026,8 +2067,8 @@ func (a *apiConnRoutineManager) start(name string, b gracefulShutdownBehavior, f
panic("unknown behavior")
}
a.eg.Go(func() error {
logger.Debug(ctx, "starting routine")
err := f(ctx, a.conn)
logger.Debug(ctx, "starting tailnet routine")
err := f(ctx, a.tAPI)
if xerrors.Is(err, context.Canceled) && ctx.Err() != nil {
logger.Debug(ctx, "swallowing context canceled")
// Don't propagate context canceled errors to the error group, because we don't want the
Expand Down
7 changes: 4 additions & 3 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"storj.io/drpc"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"tailscale.com/tailcfg"
Expand Down Expand Up @@ -97,7 +96,9 @@ func (c *Client) Close() {
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
}

func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
func (c *Client) ConnectRPC23(ctx context.Context) (
agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error,
) {
conn, lis := drpcsdk.MemTransportPipe()
c.LastWorkspaceAgent = func() {
_ = conn.Close()
Expand All @@ -115,7 +116,7 @@ func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
go func() {
_ = c.server.Serve(serveCtx, lis)
}()
return conn, nil
return agentproto.NewDRPCAgentClient(conn), proto.NewDRPCTailnetClient(conn), nil
}

func (c *Client) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
Expand Down
24 changes: 14 additions & 10 deletions agent/proto/agent_drpc_old.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ type DRPCAgentClient20 interface {
// DRPCAgentClient21 is the Agent API at v2.1. It is useful if you want to be maximally compatible
// with Coderd Release Versions from 2.12+
type DRPCAgentClient21 interface {
DRPCConn() drpc.Conn

GetManifest(ctx context.Context, in *GetManifestRequest) (*Manifest, error)
GetServiceBanner(ctx context.Context, in *GetServiceBannerRequest) (*ServiceBanner, error)
UpdateStats(ctx context.Context, in *UpdateStatsRequest) (*UpdateStatsResponse, error)
UpdateLifecycle(ctx context.Context, in *UpdateLifecycleRequest) (*Lifecycle, error)
BatchUpdateAppHealths(ctx context.Context, in *BatchUpdateAppHealthRequest) (*BatchUpdateAppHealthResponse, error)
UpdateStartup(ctx context.Context, in *UpdateStartupRequest) (*Startup, error)
BatchUpdateMetadata(ctx context.Context, in *BatchUpdateMetadataRequest) (*BatchUpdateMetadataResponse, error)
BatchCreateLogs(ctx context.Context, in *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error)
DRPCAgentClient20
GetAnnouncementBanners(ctx context.Context, in *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error)
}

// DRPCAgentClient22 is the Agent API at v2.2. It is identical to 2.1, since the change was made on
// the Tailnet API, which uses the same version number. Compatible with Coder v2.13+
type DRPCAgentClient22 interface {
DRPCAgentClient21
}

// DRPCAgentClient23 is the Agent API at v2.3. It adds the ScriptCompleted RPC. Compatible with
// Coder v2.18+
type DRPCAgentClient23 interface {
DRPCAgentClient22
ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
}
Loading
Loading