Skip to content

Commit 59c864e

Browse files
committed
fix: use explicit api versions for agent and tailnet
1 parent 22526ba commit 59c864e

File tree

14 files changed

+292
-84
lines changed

14 files changed

+292
-84
lines changed

CODEOWNERS

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# These APIs are versioned, so any changes need to be carefully reviewed for whether
2+
# to bump API major or minor versions.
3+
agent/proto/ @spikecurtis
4+
tailnet/proto/ @spikecurtis
5+
vpn/vpn.proto @spikecurtis
6+
vpn/version.go @spikecurtis

agent/agent.go

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"golang.org/x/exp/slices"
3232
"golang.org/x/sync/errgroup"
3333
"golang.org/x/xerrors"
34-
"storj.io/drpc"
3534
"tailscale.com/net/speedtest"
3635
"tailscale.com/tailcfg"
3736
"tailscale.com/types/netlogtype"
@@ -94,7 +93,9 @@ type Options struct {
9493
}
9594

9695
type Client interface {
97-
ConnectRPC(ctx context.Context) (drpc.Conn, error)
96+
ConnectRPC23(ctx context.Context) (
97+
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
98+
)
9899
RewriteDERPMap(derpMap *tailcfg.DERPMap)
99100
}
100101

@@ -410,7 +411,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
410411
fn()
411412
}
412413

413-
func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
414+
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
414415
tickerDone := make(chan struct{})
415416
collectDone := make(chan struct{})
416417
ctx, cancel := context.WithCancel(ctx)
@@ -572,7 +573,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
572573
reportTimeout = 30 * time.Second
573574
reportError = make(chan error, 1)
574575
reportInFlight = false
575-
aAPI = proto.NewDRPCAgentClient(conn)
576576
)
577577

578578
for {
@@ -627,8 +627,7 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
627627

628628
// reportLifecycle reports the current lifecycle state once. All state
629629
// changes are reported in order.
630-
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
631-
aAPI := proto.NewDRPCAgentClient(conn)
630+
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
632631
for {
633632
select {
634633
case <-a.lifecycleUpdate:
@@ -710,8 +709,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
710709
// fetchServiceBannerLoop fetches the service banner on an interval. It will
711710
// not be fetched immediately; the expectation is that it is primed elsewhere
712711
// (and must be done before the session actually starts).
713-
func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) error {
714-
aAPI := proto.NewDRPCAgentClient(conn)
712+
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
715713
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
716714
defer ticker.Stop()
717715
for {
@@ -737,7 +735,7 @@ func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) erro
737735
}
738736

739737
func (a *agent) run() (retErr error) {
740-
// This allows the agent to refresh it's token if necessary.
738+
// This allows the agent to refresh its token if necessary.
741739
// For instance identity this is required, since the instance
742740
// may not have re-provisioned, but a new agent ID was created.
743741
sessionToken, err := a.exchangeToken(a.hardCtx)
@@ -747,12 +745,12 @@ func (a *agent) run() (retErr error) {
747745
a.sessionToken.Store(&sessionToken)
748746

749747
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
750-
conn, err := a.client.ConnectRPC(a.hardCtx)
748+
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx)
751749
if err != nil {
752750
return err
753751
}
754752
defer func() {
755-
cErr := conn.Close()
753+
cErr := aAPI.DRPCConn().Close()
756754
if cErr != nil {
757755
a.logger.Debug(a.hardCtx, "error closing drpc connection", slog.Error(err))
758756
}
@@ -761,11 +759,10 @@ func (a *agent) run() (retErr error) {
761759
// A lot of routines need the agent API / tailnet API connection. We run them in their own
762760
// goroutines in parallel, but errors in any routine will cause them all to exit so we can
763761
// redial the coder server and retry.
764-
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, conn)
762+
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
765763

766-
connMan.start("init notification banners", gracefulShutdownBehaviorStop,
767-
func(ctx context.Context, conn drpc.Conn) error {
768-
aAPI := proto.NewDRPCAgentClient(conn)
764+
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
765+
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
769766
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
770767
if err != nil {
771768
return xerrors.Errorf("fetch service banner: %w", err)
@@ -781,9 +778,9 @@ func (a *agent) run() (retErr error) {
781778

782779
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
783780
// shutdown scripts.
784-
connMan.start("send logs", gracefulShutdownBehaviorRemain,
785-
func(ctx context.Context, conn drpc.Conn) error {
786-
err := a.logSender.SendLoop(ctx, proto.NewDRPCAgentClient(conn))
781+
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
782+
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
783+
err := a.logSender.SendLoop(ctx, aAPI)
787784
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
788785
// we don't want this error to tear down the API connection and propagate to the
789786
// other routines that use the API. The LogSender has already dropped a warning
@@ -795,10 +792,10 @@ func (a *agent) run() (retErr error) {
795792

796793
// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
797794
// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
798-
connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)
795+
connMan.startAgentAPI("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)
799796

800797
// metadata reporting can cease as soon as we start gracefully shutting down
801-
connMan.start("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
798+
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
802799

803800
// channels to sync goroutines below
804801
// handle manifest
@@ -819,55 +816,55 @@ func (a *agent) run() (retErr error) {
819816
networkOK := newCheckpoint(a.logger)
820817
manifestOK := newCheckpoint(a.logger)
821818

822-
connMan.start("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
819+
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
823820

824-
connMan.start("app health reporter", gracefulShutdownBehaviorStop,
825-
func(ctx context.Context, conn drpc.Conn) error {
821+
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
822+
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
826823
if err := manifestOK.wait(ctx); err != nil {
827824
return xerrors.Errorf("no manifest: %w", err)
828825
}
829826
manifest := a.manifest.Load()
830827
NewWorkspaceAppHealthReporter(
831-
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
828+
a.logger, manifest.Apps, agentsdk.AppHealthPoster(aAPI),
832829
)(ctx)
833830
return nil
834831
})
835832

836-
connMan.start("create or update network", gracefulShutdownBehaviorStop,
833+
connMan.startAgentAPI("create or update network", gracefulShutdownBehaviorStop,
837834
a.createOrUpdateNetwork(manifestOK, networkOK))
838835

839-
connMan.start("coordination", gracefulShutdownBehaviorStop,
840-
func(ctx context.Context, conn drpc.Conn) error {
836+
connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop,
837+
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
841838
if err := networkOK.wait(ctx); err != nil {
842839
return xerrors.Errorf("no network: %w", err)
843840
}
844-
return a.runCoordinator(ctx, conn, a.network)
841+
return a.runCoordinator(ctx, tAPI, a.network)
845842
},
846843
)
847844

848-
connMan.start("derp map subscriber", gracefulShutdownBehaviorStop,
849-
func(ctx context.Context, conn drpc.Conn) error {
845+
connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop,
846+
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
850847
if err := networkOK.wait(ctx); err != nil {
851848
return xerrors.Errorf("no network: %w", err)
852849
}
853-
return a.runDERPMapSubscriber(ctx, conn, a.network)
850+
return a.runDERPMapSubscriber(ctx, tAPI, a.network)
854851
})
855852

856-
connMan.start("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
853+
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
857854

858-
connMan.start("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, conn drpc.Conn) error {
855+
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
859856
if err := networkOK.wait(ctx); err != nil {
860857
return xerrors.Errorf("no network: %w", err)
861858
}
862-
return a.statsReporter.reportLoop(ctx, proto.NewDRPCAgentClient(conn))
859+
return a.statsReporter.reportLoop(ctx, aAPI)
863860
})
864861

865862
return connMan.wait()
866863
}
867864

868865
// handleManifest returns a function that fetches and processes the manifest
869-
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, conn drpc.Conn) error {
870-
return func(ctx context.Context, conn drpc.Conn) error {
866+
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
867+
return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
871868
var (
872869
sentResult = false
873870
err error
@@ -877,7 +874,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
877874
manifestOK.complete(err)
878875
}
879876
}()
880-
aAPI := proto.NewDRPCAgentClient(conn)
881877
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
882878
if err != nil {
883879
return xerrors.Errorf("fetch metadata: %w", err)
@@ -977,8 +973,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
977973

978974
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
979975
// the tailnet using the information in the manifest
980-
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, drpc.Conn) error {
981-
return func(ctx context.Context, _ drpc.Conn) (retErr error) {
976+
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error {
977+
return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) {
982978
if err := manifestOK.wait(ctx); err != nil {
983979
return xerrors.Errorf("no manifest: %w", err)
984980
}
@@ -1325,9 +1321,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
13251321

13261322
// runCoordinator runs a coordinator and returns whether a reconnect
13271323
// should occur.
1328-
func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1324+
func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
13291325
defer a.logger.Debug(ctx, "disconnected from coordination RPC")
1330-
tClient := tailnetproto.NewDRPCTailnetClient(conn)
13311326
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
13321327
// gracefully shut down.
13331328
coordinate, err := tClient.Coordinate(a.hardCtx)
@@ -1373,11 +1368,10 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13731368
}
13741369

13751370
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1376-
func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1371+
func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
13771372
defer a.logger.Debug(ctx, "disconnected from derp map RPC")
13781373
ctx, cancel := context.WithCancel(ctx)
13791374
defer cancel()
1380-
tClient := tailnetproto.NewDRPCTailnetClient(conn)
13811375
stream, err := tClient.StreamDERPMaps(ctx, &tailnetproto.StreamDERPMapsRequest{})
13821376
if err != nil {
13831377
return xerrors.Errorf("stream DERP Maps: %w", err)
@@ -1981,13 +1975,17 @@ const (
19811975

19821976
type apiConnRoutineManager struct {
19831977
logger slog.Logger
1984-
conn drpc.Conn
1978+
aAPI proto.DRPCAgentClient23
1979+
tAPI tailnetproto.DRPCTailnetClient23
19851980
eg *errgroup.Group
19861981
stopCtx context.Context
19871982
remainCtx context.Context
19881983
}
19891984

1990-
func newAPIConnRoutineManager(gracefulCtx, hardCtx context.Context, logger slog.Logger, conn drpc.Conn) *apiConnRoutineManager {
1985+
func newAPIConnRoutineManager(
1986+
gracefulCtx, hardCtx context.Context, logger slog.Logger,
1987+
aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23,
1988+
) *apiConnRoutineManager {
19911989
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
19921990
// exit if the errgroup hits an error, which usually means a problem with the conn.
19931991
eg, remainCtx := errgroup.WithContext(hardCtx)
@@ -2007,14 +2005,46 @@ func newAPIConnRoutineManager(gracefulCtx, hardCtx context.Context, logger slog.
20072005
stopCtx := eitherContext(remainCtx, gracefulCtx)
20082006
return &apiConnRoutineManager{
20092007
logger: logger,
2010-
conn: conn,
2008+
aAPI: aAPI,
2009+
tAPI: tAPI,
20112010
eg: eg,
20122011
stopCtx: stopCtx,
20132012
remainCtx: remainCtx,
20142013
}
20152014
}
20162015

2017-
func (a *apiConnRoutineManager) start(name string, b gracefulShutdownBehavior, f func(context.Context, drpc.Conn) error) {
2016+
func (a *apiConnRoutineManager) startAgentAPI(name string, b gracefulShutdownBehavior, f func(context.Context, proto.DRPCAgentClient23) error) {
2017+
logger := a.logger.With(slog.F("name", name))
2018+
var ctx context.Context
2019+
switch b {
2020+
case gracefulShutdownBehaviorStop:
2021+
ctx = a.stopCtx
2022+
case gracefulShutdownBehaviorRemain:
2023+
ctx = a.remainCtx
2024+
default:
2025+
panic("unknown behavior")
2026+
}
2027+
a.eg.Go(func() error {
2028+
logger.Debug(ctx, "starting agent routine")
2029+
err := f(ctx, a.aAPI)
2030+
if xerrors.Is(err, context.Canceled) && ctx.Err() != nil {
2031+
logger.Debug(ctx, "swallowing context canceled")
2032+
// Don't propagate context canceled errors to the error group, because we don't want the
2033+
// graceful context being canceled to halt the work of routines with
2034+
// gracefulShutdownBehaviorRemain. Note that we check both that the error is
2035+
// context.Canceled and that *our* context is currently canceled, because when Coderd
2036+
// unilaterally closes the API connection (for example if the build is outdated), it can
2037+
// sometimes show up as context.Canceled in our RPC calls.
2038+
return nil
2039+
}
2040+
logger.Debug(ctx, "routine exited", slog.Error(err))
2041+
if err != nil {
2042+
return xerrors.Errorf("error in routine %s: %w", name, err)
2043+
}
2044+
return nil
2045+
})
2046+
}
2047+
func (a *apiConnRoutineManager) startTailnetAPI(name string, b gracefulShutdownBehavior, f func(context.Context, tailnetproto.DRPCTailnetClient23) error) {
20182048
logger := a.logger.With(slog.F("name", name))
20192049
var ctx context.Context
20202050
switch b {
@@ -2026,8 +2056,8 @@ func (a *apiConnRoutineManager) start(name string, b gracefulShutdownBehavior, f
20262056
panic("unknown behavior")
20272057
}
20282058
a.eg.Go(func() error {
2029-
logger.Debug(ctx, "starting routine")
2030-
err := f(ctx, a.conn)
2059+
logger.Debug(ctx, "starting tailnet routine")
2060+
err := f(ctx, a.tAPI)
20312061
if xerrors.Is(err, context.Canceled) && ctx.Err() != nil {
20322062
logger.Debug(ctx, "swallowing context canceled")
20332063
// Don't propagate context canceled errors to the error group, because we don't want the

agent/agenttest/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"golang.org/x/exp/slices"
1616
"golang.org/x/xerrors"
1717
"google.golang.org/protobuf/types/known/durationpb"
18-
"storj.io/drpc"
1918
"storj.io/drpc/drpcmux"
2019
"storj.io/drpc/drpcserver"
2120
"tailscale.com/tailcfg"
@@ -97,7 +96,9 @@ func (c *Client) Close() {
9796
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
9897
}
9998

100-
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
99+
func (c *Client) ConnectRPC23(ctx context.Context) (
100+
agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error,
101+
) {
101102
conn, lis := drpcsdk.MemTransportPipe()
102103
c.LastWorkspaceAgent = func() {
103104
_ = conn.Close()
@@ -115,7 +116,7 @@ func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
115116
go func() {
116117
_ = c.server.Serve(serveCtx, lis)
117118
}()
118-
return conn, nil
119+
return agentproto.NewDRPCAgentClient(conn), proto.NewDRPCTailnetClient(conn), nil
119120
}
120121

121122
func (c *Client) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {

agent/proto/agent_drpc_old.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@ type DRPCAgentClient20 interface {
2424
// DRPCAgentClient21 is the Agent API at v2.1. It is useful if you want to be maximally compatible
2525
// with Coderd Release Versions from 2.12+
2626
type DRPCAgentClient21 interface {
27-
DRPCConn() drpc.Conn
28-
29-
GetManifest(ctx context.Context, in *GetManifestRequest) (*Manifest, error)
30-
GetServiceBanner(ctx context.Context, in *GetServiceBannerRequest) (*ServiceBanner, error)
31-
UpdateStats(ctx context.Context, in *UpdateStatsRequest) (*UpdateStatsResponse, error)
32-
UpdateLifecycle(ctx context.Context, in *UpdateLifecycleRequest) (*Lifecycle, error)
33-
BatchUpdateAppHealths(ctx context.Context, in *BatchUpdateAppHealthRequest) (*BatchUpdateAppHealthResponse, error)
34-
UpdateStartup(ctx context.Context, in *UpdateStartupRequest) (*Startup, error)
35-
BatchUpdateMetadata(ctx context.Context, in *BatchUpdateMetadataRequest) (*BatchUpdateMetadataResponse, error)
36-
BatchCreateLogs(ctx context.Context, in *BatchCreateLogsRequest) (*BatchCreateLogsResponse, error)
27+
DRPCAgentClient20
3728
GetAnnouncementBanners(ctx context.Context, in *GetAnnouncementBannersRequest) (*GetAnnouncementBannersResponse, error)
3829
}
30+
31+
// DRPCAgentClient22 is the Agent API at v2.2. It is identical to 2.1, since the change was made on
32+
// the Tailnet API, which uses the same version number. Compatible with Coder v2.13+
33+
type DRPCAgentClient22 interface {
34+
DRPCAgentClient21
35+
}
36+
37+
// DRPCAgentClient23 is the Agent API at v2.3. It adds the ScriptCompleted RPC. Compatible with
38+
// Coder v2.18+
39+
type DRPCAgentClient23 interface {
40+
DRPCAgentClient22
41+
ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
42+
}

coderd/workspaceagents_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,13 +2041,12 @@ func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCA
20412041
}
20422042

20432043
func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup *agentproto.Startup) error {
2044-
conn, err := client.ConnectRPC(ctx)
2044+
aAPI, _, err := client.ConnectRPC23(ctx)
20452045
require.NoError(t, err)
20462046
defer func() {
2047-
cErr := conn.Close()
2047+
cErr := aAPI.DRPCConn().Close()
20482048
require.NoError(t, cErr)
20492049
}()
2050-
aAPI := agentproto.NewDRPCAgentClient(conn)
20512050
_, err = aAPI.UpdateStartup(ctx, &agentproto.UpdateStartupRequest{Startup: startup})
20522051
return err
20532052
}

0 commit comments

Comments
 (0)