Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: move agentapi to new package
  • Loading branch information
deansheather committed Nov 27, 2023
commit 49a231465ba43c52d22f61fbbcc6db1443f1240b
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.

if vals.Prometheus.Enable {
// Agent metrics require reference to the tailnet coordinator, so must be initiated after Coder API.
closeAgentsFunc, err := prometheusmetrics.Agents(ctx, logger, options.PrometheusRegistry, coderAPI.Database, &coderAPI.TailnetCoordinator, coderAPI.DERPMap, coderAPI.Options.AgentInactiveDisconnectTimeout, 0)
closeAgentsFunc, err := prometheusmetrics.Agents(ctx, logger, options.PrometheusRegistry, coderAPI.Database, coderAPI.TailnetCoordinator, coderAPI.DERPMap, coderAPI.Options.AgentInactiveDisconnectTimeout, 0)
if err != nil {
return xerrors.Errorf("register agents prometheus metric: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions coderd/activitybump.go → coderd/agentapi/activitybump.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package coderd
package agentapi

import (
"context"
Expand All @@ -11,7 +11,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
)

// activityBumpWorkspace automatically bumps the workspace's auto-off timer
// ActivityBumpWorkspace automatically bumps the workspace's auto-off timer
// if it is set to expire soon. The deadline will be bumped by 1 hour*.
// If the bump crosses over an autostart time, the workspace will be
// bumped by the workspace ttl instead.
Expand All @@ -36,7 +36,7 @@ import (
// A way to avoid this is to configure the max deadline to something that will not
// span more than 1 day. This will force the workspace to restart and reset the deadline
// each morning when it autostarts.
func activityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Store, workspaceID uuid.UUID, nextAutostart time.Time) {
func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Store, workspaceID uuid.UUID, nextAutostart time.Time) {
// We set a short timeout so if the app is under load, these
// low priority operations fail first.
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package coderd
package agentapi_test

import (
"database/sql"
Expand All @@ -8,6 +8,7 @@ import (
"github.com/google/uuid"

"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
Expand Down Expand Up @@ -236,7 +237,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {

// Bump duration is measured from the time of the bump, so we measure from here.
start := dbtime.Now()
activityBumpWorkspace(ctx, log, db, bld.WorkspaceID, tt.nextAutostart)
agentapi.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, tt.nextAutostart)
end := dbtime.Now()

// Validate our state after bump
Expand Down
233 changes: 233 additions & 0 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package agentapi

import (
"context"
"io"
"net"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"tailscale.com/tailcfg"

"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
)

const AgentAPIVersionDRPC = "2.0"

// API implements the DRPC agent API interface from agent/proto. This struct is
// instantiated once per agent connection and kept alive for the duration of the
// session.
type API struct {
opts Options
*ManifestAPI
*ServiceBannerAPI
*StatsAPI
*LifecycleAPI
*AppsAPI
*MetadataAPI
*LogsAPI
*TailnetAPI

mu sync.Mutex
cachedWorkspaceID uuid.UUID
}

var _ agentproto.DRPCAgentServer = &API{}

type Options struct {
AgentID uuid.UUID

Ctx context.Context
Log slog.Logger
Database database.Store
Pubsub pubsub.Pubsub
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher *batchstats.Batcher
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)

AccessURL *url.URL
AppHostname string
AgentInactiveDisconnectTimeout time.Duration
AgentFallbackTroubleshootingURL string
AgentStatsRefreshInterval time.Duration
DisableDirectConnections bool
DerpForceWebSockets bool
DerpMapUpdateFrequency time.Duration
ExternalAuthConfigs []*externalauth.Config

// Optional:
// WorkspaceID avoids a future lookup to find the workspace ID by setting
// the cache in advance.
WorkspaceID uuid.UUID
UpdateAgentMetricsFn func(ctx context.Context, username, workspaceName, agentName string, metrics []*agentproto.Stats_Metric)
}

func New(opts Options) *API {
api := &API{
opts: opts,
mu: sync.Mutex{},
cachedWorkspaceID: opts.WorkspaceID,
}

api.ManifestAPI = &ManifestAPI{
AccessURL: opts.AccessURL,
AppHostname: opts.AppHostname,
AgentInactiveDisconnectTimeout: opts.AgentInactiveDisconnectTimeout,
AgentFallbackTroubleshootingURL: opts.AgentFallbackTroubleshootingURL,
ExternalAuthConfigs: opts.ExternalAuthConfigs,
DisableDirectConnections: opts.DisableDirectConnections,
DerpForceWebSockets: opts.DerpForceWebSockets,
AgentFn: api.agent,
Database: opts.Database,
DerpMapFn: opts.DerpMapFn,
TailnetCoordinator: opts.TailnetCoordinator,
}

api.ServiceBannerAPI = &ServiceBannerAPI{
Database: opts.Database,
}

api.StatsAPI = &StatsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
StatsBatcher: opts.StatsBatcher,
TemplateScheduleStore: opts.TemplateScheduleStore,
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
UpdateAgentMetricsFn: opts.UpdateAgentMetricsFn,
}

api.LifecycleAPI = &LifecycleAPI{
AgentFn: api.agent,
WorkspaceIDFn: api.workspaceID,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
}

api.AppsAPI = &AppsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
}

api.MetadataAPI = &MetadataAPI{
AgentFn: api.agent,
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
}

api.LogsAPI = &LogsAPI{
AgentFn: api.agent,
Database: opts.Database,
Log: opts.Log,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: opts.PublishWorkspaceAgentLogsUpdateFn,
}

api.TailnetAPI = &TailnetAPI{
Ctx: opts.Ctx,
DerpMapFn: opts.DerpMapFn,
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
}

return api
}

func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
mux := drpcmux.New()
err := agentproto.DRPCRegisterAgent(mux, a)
if err != nil {
return nil, xerrors.Errorf("register agent API protocol in DRPC mux: %w", err)
}

return drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
a.opts.Log.Debug(ctx, "drpc server error", slog.Error(err))
},
},
), nil
}

func (a *API) Serve(ctx context.Context, l net.Listener) error {
server, err := a.Server(ctx)
if err != nil {
return xerrors.Errorf("create agent API server: %w", err)
}

return server.Serve(ctx, l)
}

func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
agent, err := a.opts.Database.GetWorkspaceAgentByID(ctx, a.opts.AgentID)
if err != nil {
return database.WorkspaceAgent{}, xerrors.Errorf("get workspace agent by id %q: %w", a.opts.AgentID, err)
}
return agent, nil
}

func (a *API) workspaceID(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
a.mu.Lock()
if a.cachedWorkspaceID != uuid.Nil {
id := a.cachedWorkspaceID
a.mu.Unlock()
return id, nil
}

if agent == nil {
agnt, err := a.agent(ctx)
if err != nil {
return uuid.Nil, err
}
agent = &agnt
}

resource, err := a.opts.Database.GetWorkspaceResourceByID(ctx, agent.ResourceID)
if err != nil {
return uuid.Nil, xerrors.Errorf("get workspace agent resource by id %q: %w", agent.ResourceID, err)
}

build, err := a.opts.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
if err != nil {
return uuid.Nil, xerrors.Errorf("get workspace build by job id %q: %w", resource.JobID, err)
}

a.mu.Lock()
a.cachedWorkspaceID = build.WorkspaceID
a.mu.Unlock()
return build.WorkspaceID, nil
}

func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent) error {
workspaceID, err := a.workspaceID(ctx, agent)
if err != nil {
return err
}

a.opts.PublishWorkspaceUpdateFn(ctx, workspaceID)
return nil
}
98 changes: 98 additions & 0 deletions coderd/agentapi/apps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package agentapi

import (
"context"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
)

type AppsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Log slog.Logger
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
}

func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {
return nil, err
}

if len(req.Updates) == 0 {
return &agentproto.BatchUpdateAppHealthResponse{}, nil
}

apps, err := a.Database.GetWorkspaceAppsByAgentID(ctx, workspaceAgent.ID)
if err != nil {
return nil, xerrors.Errorf("get workspace apps by agent ID %q: %w", workspaceAgent.ID, err)
}

var newApps []database.WorkspaceApp
for _, update := range req.Updates {
updateID, err := uuid.FromBytes(update.Id)
if err != nil {
return nil, xerrors.Errorf("parse workspace app ID %q: %w", update.Id, err)
}

old := func() *database.WorkspaceApp {
for _, app := range apps {
if app.ID == updateID {
return &app
}
}

return nil
}()
if old == nil {
return nil, xerrors.Errorf("workspace app ID %q not found", updateID)
}

if old.HealthcheckUrl == "" {
return nil, xerrors.Errorf("workspace app %q (%q) does not have healthchecks enabled", updateID, old.Slug)
}

var newHealth database.WorkspaceAppHealth
switch update.Health {
case agentproto.AppHealth_DISABLED:
newHealth = database.WorkspaceAppHealthDisabled
case agentproto.AppHealth_INITIALIZING:
newHealth = database.WorkspaceAppHealthInitializing
case agentproto.AppHealth_HEALTHY:
newHealth = database.WorkspaceAppHealthHealthy
case agentproto.AppHealth_UNHEALTHY:
newHealth = database.WorkspaceAppHealthUnhealthy
default:
return nil, xerrors.Errorf("unknown health status %q for app %q (%q)", update.Health, updateID, old.Slug)
}

// Don't bother updating if the value hasn't changed.
if old.Health == newHealth {
continue
}
old.Health = newHealth

newApps = append(newApps, *old)
}

for _, app := range newApps {
err = a.Database.UpdateWorkspaceAppHealthByID(ctx, database.UpdateWorkspaceAppHealthByIDParams{
ID: app.ID,
Health: app.Health,
})
if err != nil {
return nil, xerrors.Errorf("update workspace app health for app %q (%q): %w", err, app.ID, app.Slug)
}
}

err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
return &agentproto.BatchUpdateAppHealthResponse{}, nil
}
Loading