Skip to content

Commit 7a7a865

Browse files
committed
feat: add resume support to coordinator connections
Inspired by other real time apps, the coordinator RPC API now has a "RefreshResumeToken" RPC that issues a JWT that can be used on reconnect to persist the same client peer ID.
1 parent ba4186d commit 7a7a865

30 files changed

+1428
-432
lines changed

cli/server.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -791,18 +791,52 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
791791
}
792792
}
793793

794-
keyBytes, err := hex.DecodeString(oauthSigningKeyStr)
794+
oauthKeyBytes, err := hex.DecodeString(oauthSigningKeyStr)
795795
if err != nil {
796796
return xerrors.Errorf("decode oauth signing key from database: %w", err)
797797
}
798-
if len(keyBytes) != len(options.OAuthSigningKey) {
799-
return xerrors.Errorf("oauth signing key in database is not the correct length, expect %d got %d", len(options.OAuthSigningKey), len(keyBytes))
798+
if len(oauthKeyBytes) != len(options.OAuthSigningKey) {
799+
return xerrors.Errorf("oauth signing key in database is not the correct length, expect %d got %d", len(options.OAuthSigningKey), len(oauthKeyBytes))
800800
}
801-
copy(options.OAuthSigningKey[:], keyBytes)
801+
copy(options.OAuthSigningKey[:], oauthKeyBytes)
802802
if options.OAuthSigningKey == [32]byte{} {
803803
return xerrors.Errorf("oauth signing key in database is empty")
804804
}
805805

806+
// Read the coordinator resume token signing key from the
807+
// database.
808+
resumeTokenKey := [64]byte{}
809+
resumeTokenKeyStr, err := tx.GetCoordinatorResumeTokenSigningKey(ctx)
810+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
811+
return xerrors.Errorf("get coordinator resume token key: %w", err)
812+
}
813+
if decoded, err := hex.DecodeString(resumeTokenKeyStr); err != nil || len(decoded) != len(resumeTokenKey) {
814+
b := make([]byte, len(resumeTokenKey))
815+
_, err := rand.Read(b)
816+
if err != nil {
817+
return xerrors.Errorf("generate fresh coordinator resume token key: %w", err)
818+
}
819+
820+
resumeTokenKeyStr = hex.EncodeToString(b)
821+
err = tx.UpsertCoordinatorResumeTokenSigningKey(ctx, resumeTokenKeyStr)
822+
if err != nil {
823+
return xerrors.Errorf("insert freshly generated coordinator resume token key to database: %w", err)
824+
}
825+
}
826+
827+
resumeTokenKeyBytes, err := hex.DecodeString(resumeTokenKeyStr)
828+
if err != nil {
829+
return xerrors.Errorf("decode coordinator resume token key from database: %w", err)
830+
}
831+
if len(resumeTokenKeyBytes) != len(resumeTokenKey) {
832+
return xerrors.Errorf("coordinator resume token key in database is not the correct length, expect %d got %d", len(resumeTokenKey), len(resumeTokenKeyBytes))
833+
}
834+
copy(resumeTokenKey[:], resumeTokenKeyBytes)
835+
if resumeTokenKey == [64]byte{} {
836+
return xerrors.Errorf("coordinator resume token key in database is empty")
837+
}
838+
options.CoordinatorResumeTokenProvider = tailnet.NewResumeTokenKeyProvider(resumeTokenKey, tailnet.DefaultResumeTokenExpiry)
839+
806840
return nil
807841
}, nil)
808842
if err != nil {

coderd/coderd.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ type Options struct {
182182
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
183183
// workspace applications. It consists of both a signing and encryption key.
184184
AppSecurityKey workspaceapps.SecurityKey
185+
// CoordinatorResumeTokenProvider is used to provide and validate resume
186+
// tokens issued by and passed to the coordinator DRPC API.
187+
CoordinatorResumeTokenProvider tailnet.ResumeTokenProvider
185188

186189
HealthcheckFunc func(ctx context.Context, apiKey string) *healthsdk.HealthcheckReport
187190
HealthcheckTimeout time.Duration
@@ -583,12 +586,16 @@ func New(options *Options) *API {
583586
api.Options.NetworkTelemetryBatchMaxSize,
584587
api.handleNetworkTelemetry,
585588
)
589+
if options.CoordinatorResumeTokenProvider == nil {
590+
panic("CoordinatorResumeTokenProvider is nil")
591+
}
586592
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
587593
Logger: api.Logger.Named("tailnetclient"),
588594
CoordPtr: &api.TailnetCoordinator,
589595
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
590596
DERPMapFn: api.DERPMap,
591597
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
598+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
592599
})
593600
if err != nil {
594601
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
@@ -613,6 +620,9 @@ func New(options *Options) *API {
613620
options.WorkspaceAppsStatsCollectorOptions.Reporter = api.statsReporter
614621
}
615622

623+
if options.AppSecurityKey.IsZero() {
624+
api.Logger.Fatal(api.ctx, "app security key cannot be zero")
625+
}
616626
api.workspaceAppServer = &workspaceapps.Server{
617627
Logger: workspaceAppsLogger,
618628

coderd/coderdtest/coderdtest.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
492492
TailnetCoordinator: options.Coordinator,
493493
BaseDERPMap: derpMap,
494494
DERPMapUpdateFrequency: 150 * time.Millisecond,
495+
CoordinatorResumeTokenProvider: tailnet.InsecureTestResumeTokenProvider,
495496
MetricsCacheRefreshInterval: options.MetricsCacheRefreshInterval,
496497
AgentStatsRefreshInterval: options.AgentStatsRefreshInterval,
497498
DeploymentValues: options.DeploymentValues,

coderd/database/dbauthz/dbauthz.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,7 +1252,9 @@ func (q *querier) GetAnnouncementBanners(ctx context.Context) (string, error) {
12521252
}
12531253

12541254
func (q *querier) GetAppSecurityKey(ctx context.Context) (string, error) {
1255-
// No authz checks
1255+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
1256+
return "", err
1257+
}
12561258
return q.db.GetAppSecurityKey(ctx)
12571259
}
12581260

@@ -1284,6 +1286,13 @@ func (q *querier) GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUI
12841286
return q.db.GetAuthorizationUserRoles(ctx, userID)
12851287
}
12861288

1289+
func (q *querier) GetCoordinatorResumeTokenSigningKey(ctx context.Context) (string, error) {
1290+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
1291+
return "", err
1292+
}
1293+
return q.db.GetCoordinatorResumeTokenSigningKey(ctx)
1294+
}
1295+
12871296
func (q *querier) GetDBCryptKeys(ctx context.Context) ([]database.DBCryptKey, error) {
12881297
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
12891298
return nil, err
@@ -3645,7 +3654,9 @@ func (q *querier) UpsertAnnouncementBanners(ctx context.Context, value string) e
36453654
}
36463655

36473656
func (q *querier) UpsertAppSecurityKey(ctx context.Context, data string) error {
3648-
// No authz checks as this is done during startup
3657+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
3658+
return err
3659+
}
36493660
return q.db.UpsertAppSecurityKey(ctx, data)
36503661
}
36513662

@@ -3656,6 +3667,13 @@ func (q *querier) UpsertApplicationName(ctx context.Context, value string) error
36563667
return q.db.UpsertApplicationName(ctx, value)
36573668
}
36583669

3670+
func (q *querier) UpsertCoordinatorResumeTokenSigningKey(ctx context.Context, value string) error {
3671+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
3672+
return err
3673+
}
3674+
return q.db.UpsertCoordinatorResumeTokenSigningKey(ctx, value)
3675+
}
3676+
36593677
// UpsertCustomRole does a series of authz checks to protect custom roles.
36603678
// - Check custom roles are valid for their resource types + actions
36613679
// - Check the actor can create the custom role

coderd/database/dbauthz/dbauthz_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2445,10 +2445,10 @@ func (s *MethodTestSuite) TestSystemFunctions() {
24452445
check.Args(int32(0)).Asserts(rbac.ResourceSystem, policy.ActionRead)
24462446
}))
24472447
s.Run("GetAppSecurityKey", s.Subtest(func(db database.Store, check *expects) {
2448-
check.Args().Asserts()
2448+
check.Args().Asserts(rbac.ResourceSystem, policy.ActionRead)
24492449
}))
24502450
s.Run("UpsertAppSecurityKey", s.Subtest(func(db database.Store, check *expects) {
2451-
check.Args("").Asserts()
2451+
check.Args("foo").Asserts(rbac.ResourceSystem, policy.ActionUpdate)
24522452
}))
24532453
s.Run("GetApplicationName", s.Subtest(func(db database.Store, check *expects) {
24542454
db.UpsertApplicationName(context.Background(), "foo")
@@ -2488,6 +2488,13 @@ func (s *MethodTestSuite) TestSystemFunctions() {
24882488
db.UpsertOAuthSigningKey(context.Background(), "foo")
24892489
check.Args().Asserts(rbac.ResourceSystem, policy.ActionUpdate)
24902490
}))
2491+
s.Run("UpsertCoordinatorResumeTokenSigningKey", s.Subtest(func(db database.Store, check *expects) {
2492+
check.Args("foo").Asserts(rbac.ResourceSystem, policy.ActionUpdate)
2493+
}))
2494+
s.Run("GetCoordinatorResumeTokenSigningKey", s.Subtest(func(db database.Store, check *expects) {
2495+
db.UpsertCoordinatorResumeTokenSigningKey(context.Background(), "foo")
2496+
check.Args().Asserts(rbac.ResourceSystem, policy.ActionUpdate)
2497+
}))
24912498
s.Run("InsertMissingGroups", s.Subtest(func(db database.Store, check *expects) {
24922499
check.Args(database.InsertMissingGroupsParams{}).Asserts(rbac.ResourceSystem, policy.ActionCreate).Errors(errMatchAny)
24932500
}))

coderd/database/dbmem/dbmem.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,20 +196,21 @@ type data struct {
196196
customRoles []database.CustomRole
197197
// Locks is a map of lock names. Any keys within the map are currently
198198
// locked.
199-
locks map[int64]struct{}
200-
deploymentID string
201-
derpMeshKey string
202-
lastUpdateCheck []byte
203-
announcementBanners []byte
204-
healthSettings []byte
205-
notificationsSettings []byte
206-
applicationName string
207-
logoURL string
208-
appSecurityKey string
209-
oauthSigningKey string
210-
lastLicenseID int32
211-
defaultProxyDisplayName string
212-
defaultProxyIconURL string
199+
locks map[int64]struct{}
200+
deploymentID string
201+
derpMeshKey string
202+
lastUpdateCheck []byte
203+
announcementBanners []byte
204+
healthSettings []byte
205+
notificationsSettings []byte
206+
applicationName string
207+
logoURL string
208+
appSecurityKey string
209+
oauthSigningKey string
210+
coordinatorResumeTokenSigningKey string
211+
lastLicenseID int32
212+
defaultProxyDisplayName string
213+
defaultProxyIconURL string
213214
}
214215

215216
func validateDatabaseTypeWithValid(v reflect.Value) (handled bool, err error) {
@@ -2172,6 +2173,15 @@ func (q *FakeQuerier) GetAuthorizationUserRoles(_ context.Context, userID uuid.U
21722173
}, nil
21732174
}
21742175

2176+
func (q *FakeQuerier) GetCoordinatorResumeTokenSigningKey(_ context.Context) (string, error) {
2177+
q.mutex.RLock()
2178+
defer q.mutex.RUnlock()
2179+
if q.coordinatorResumeTokenSigningKey == "" {
2180+
return "", sql.ErrNoRows
2181+
}
2182+
return q.coordinatorResumeTokenSigningKey, nil
2183+
}
2184+
21752185
func (q *FakeQuerier) GetDBCryptKeys(_ context.Context) ([]database.DBCryptKey, error) {
21762186
q.mutex.RLock()
21772187
defer q.mutex.RUnlock()
@@ -8800,6 +8810,14 @@ func (q *FakeQuerier) UpsertApplicationName(_ context.Context, data string) erro
88008810
return nil
88018811
}
88028812

8813+
func (q *FakeQuerier) UpsertCoordinatorResumeTokenSigningKey(_ context.Context, value string) error {
8814+
q.mutex.Lock()
8815+
defer q.mutex.Unlock()
8816+
8817+
q.coordinatorResumeTokenSigningKey = value
8818+
return nil
8819+
}
8820+
88038821
func (q *FakeQuerier) UpsertCustomRole(_ context.Context, arg database.UpsertCustomRoleParams) (database.CustomRole, error) {
88048822
err := validateDatabaseType(arg)
88058823
if err != nil {

coderd/database/dbmetrics/dbmetrics.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/siteconfig.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ SELECT value FROM site_configs WHERE key = 'oauth_signing_key';
7171
INSERT INTO site_configs (key, value) VALUES ('oauth_signing_key', $1)
7272
ON CONFLICT (key) DO UPDATE set value = $1 WHERE site_configs.key = 'oauth_signing_key';
7373

74+
-- name: GetCoordinatorResumeTokenSigningKey :one
75+
SELECT value FROM site_configs WHERE key = 'coordinator_resume_token_signing_key';
76+
77+
-- name: UpsertCoordinatorResumeTokenSigningKey :exec
78+
INSERT INTO site_configs (key, value) VALUES ('coordinator_resume_token_signing_key', $1)
79+
ON CONFLICT (key) DO UPDATE set value = $1 WHERE site_configs.key = 'coordinator_resume_token_signing_key';
80+
7481
-- name: GetHealthSettings :one
7582
SELECT
7683
COALESCE((SELECT value FROM site_configs WHERE key = 'health_settings'), '{}') :: text AS health_settings

0 commit comments

Comments
 (0)