Skip to content

feat: expose agent metrics via Prometheus endpoint #7011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Agents
  • Loading branch information
mtojek committed Apr 5, 2023
commit 8764f8975d75ebb45d9e5996fac9b7509c953e33
33 changes: 17 additions & 16 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,16 +849,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
defer options.Telemetry.Close()
}

databaseStoreWithoutAuth := options.Database

// We use a separate coderAPICloser so the Enterprise API
// can have it's own close functions. This is cleaner
// than abstracting the Coder API itself.
coderAPI, coderAPICloser, err := newAPI(ctx, options)
if err != nil {
return xerrors.Errorf("create coder API: %w", err)
}

// This prevents the pprof import from being accidentally deleted.
_ = pprof.Handler
if cfg.Pprof.Enable {
Expand All @@ -881,12 +871,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}
defer closeWorkspacesFunc()

closeAgentsFunc, err := prometheusmetrics.Agents(ctx, options.PrometheusRegistry, databaseStoreWithoutAuth, &coderAPI.TailnetCoordinator, options.DERPMap, 0)
if err != nil {
return xerrors.Errorf("register agents prometheus metric: %w", err)
}
defer closeAgentsFunc()

//nolint:revive
defer serveHandler(ctx, logger, promhttp.InstrumentMetricHandler(
options.PrometheusRegistry, promhttp.HandlerFor(options.PrometheusRegistry, promhttp.HandlerOpts{}),
Expand All @@ -897,6 +881,23 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
options.SwaggerEndpoint = cfg.Swagger.Enable.Value()
}

// We use a separate coderAPICloser so the Enterprise API
// can have it's own close functions. This is cleaner
// than abstracting the Coder API itself.
coderAPI, coderAPICloser, err := newAPI(ctx, options)
if err != nil {
return xerrors.Errorf("create coder API: %w", err)
}

if cfg.Prometheus.Enable {
// Agent metrics require reference to the tailnet coordinator, so must be initiated after Coder API.
closeAgentsFunc, err := prometheusmetrics.Agents(ctx, logger, options.PrometheusRegistry, coderAPI.Database, &coderAPI.TailnetCoordinator, options.DERPMap, coderAPI.Options.AgentInactiveDisconnectTimeout, 0)
if err != nil {
return xerrors.Errorf("register agents prometheus metric: %w", err)
}
defer closeAgentsFunc()
}

client := codersdk.New(localURL)
if localURL.Scheme == "https" && isLocalhost(localURL.Hostname()) {
// The certificate will likely be self-signed or for a different
Expand Down
133 changes: 75 additions & 58 deletions coderd/prometheusmetrics/prometheusmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package prometheusmetrics
import (
"context"
"fmt"
"log"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -13,8 +12,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
"tailscale.com/tailcfg"

"cdr.dev/slog"

"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/tailnet"
)

Expand Down Expand Up @@ -115,119 +117,134 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
}

// Agents tracks the total number of workspaces with labels on status.
func Agents(ctx context.Context, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMap *tailcfg.DERPMap, duration time.Duration) (context.CancelFunc, error) {
func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMap *tailcfg.DERPMap, agentInactiveDisconnectTimeout, duration time.Duration) (context.CancelFunc, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this data already live on each workspace/agent?

I wonder if there is another way to do this. In v1 we implemented a custom prometheus.Collector to handle agent stats in a non-racy way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to keep it aligned with other prometheusmetrics, and use a single source of metrics, the database. In this case, the information we present over Coderd API is consistent with Prometheus endpoint.

Regarding the prometheus.Collector, I will take a look 👍 (as stated in the other comment).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the v1 implementation a bit complex for this use case. As I tried to separate metric collections apart from agent reporting logic, a collector like the one in v1 would be great if we have metrics coming from different parts of the application.

BTW It looks like the v1 collector doesn't support vectors, but here we depend mostly on them. Porting the collector would make it more complex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I didn't think we could port the v1 metrcis verbatim. How it works though is each agent uses prometheus to create their metrics. Those metrics get sent to the coderd they are connected to with their agent, and push their prom metrics. The aggregator then combines all those metrics together, labeling them for each workspace.

The v1 collector does support labels, which is "vectors". Each unique label set is a single "metric" to the aggregator. So coderd_agents_metric{favorite-number="7"} and coderd_agents_metric{favorite-number="1"} are 2 different prometheus.Metric. This matches the prometheus design of labels:

Remember that every unique combination of key-value label pairs represents a new time series


I liked the v1 design as it made it easier to add metrics from the agent, as I think we make our own payloads in v2. 🤷‍♂️

I was more pointing it out as making a Collector gives you a lot more freedom on how to manipulate the Gather part of the metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How it works though is each agent uses prometheus to create their metrics. Those metrics get sent to the coderd they are connected to with their agent, and push their prom metrics.

That is one approach, but unfortunately, we would miss metrics from disconnected agents.

As stated in the PR description, the idea behind this submission is to expose metric data we have already collected and stored in the database. If we have this data already stored in the database, why just don't use it :) A similar story would be with "agent stats" that are stored in a dedicated database table.

Let me know your thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disconnected agent stats in v1 Prometheus are eventually "stale" and then removed. Since Prometheus doesn't need to be a perfect source of truth (a little lag eg 1min is ok imo).


I agree with you though to just expose what we currently have is the go to move. My initial hunch was to make a collector that has all the internal counters you are trying to track.

When the "Gather" func is called, the Collector returns the cached counters.

Every "update" perioud, new counts are created and incremented in an internal loop. When the counts are finished, then the Collector is locked, all counters updated, unlocked.

So the exposed counters are always "correct"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial hunch was to make a collector that has all the internal counters you are trying to track.

The idea is good for sure, it's just a matter of the capacity we have 👍

Every "update" perioud, new counts are created and incremented in an internal loop. When the counts are finished, then the Collector is locked, all counters updated, unlocked.

Yup, this is more or less what I've implemented on the gauge vector level: CachedGaugeVec. I just found it simpler compared with the concept of a collector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense and looks good 👍

if duration == 0 {
duration = 15 * time.Second // TODO 5 * time.Minute
}

agentsConnectionGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
workspaceAgentsGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "connection",
Help: "The agent connection with a status.",
}, []string{"agent_name", "workspace_name", "status"})
err := registerer.Register(agentsConnectionGauge)
Name: "up",
Help: "The number of active agents per workspace.",
}, []string{"username", "workspace_name"})
err := registerer.Register(workspaceAgentsGauge)
if err != nil {
return nil, err
}

agentsUserLatenciesGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
agentsConnectionGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "user_latencies_seconds",
Help: "The user's agent latency in seconds.",
}, []string{"agent_id", "workspace_name", "derp_region", "preferred"})
err = registerer.Register(agentsUserLatenciesGauge)
Name: "connections",
Help: "Agent connections with statuses.",
}, []string{"agent_name", "username", "workspace_name", "status", "lifecycle_state", "tailnet_node"})
err = registerer.Register(agentsConnectionGauge)
if err != nil {
return nil, err
}

// FIXME connection_type ide
agentsConnectionLatenciesGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "connection_latencies_seconds",
Help: "Agent connection latencies in seconds.",
}, []string{"agent_id", "username", "workspace_name", "derp_region", "preferred"})
err = registerer.Register(agentsConnectionLatenciesGauge)
if err != nil {
return nil, err
}

ctx, cancelFunc := context.WithCancel(ctx)
// nolint:gocritic // Prometheus must collect metrics for all Coder users.
ctx, cancelFunc := context.WithCancel(dbauthz.AsSystemRestricted(ctx))
ticker := time.NewTicker(duration)
go func() {
defer ticker.Stop()
for {
log.Println("Agents!!!")

select {
case <-ctx.Done():
return
case <-ticker.C:
}

// FIXME Optimize this routine: SQL db calls
logger.Info(ctx, "Collect agent metrics now")

builds, err := db.GetLatestWorkspaceBuilds(ctx)
workspaceRows, err := db.GetWorkspaces(ctx, database.GetWorkspacesParams{
AgentInactiveDisconnectTimeoutSeconds: int64(agentInactiveDisconnectTimeout.Seconds()),
})
if err != nil {
log.Println("1", err)
logger.Error(ctx, "can't get workspace rows", slog.Error(err))
continue
}

workspaceAgentsGauge.Reset()
agentsConnectionGauge.Reset()
agentsUserLatenciesGauge.Reset()
for _, build := range builds {
workspace, err := db.GetWorkspaceByID(ctx, build.WorkspaceID)
agentsConnectionLatenciesGauge.Reset()

for _, workspace := range workspaceRows {
user, err := db.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
log.Println("2", err)
logger.Error(ctx, "can't get user", slog.Error(err), slog.F("user_id", workspace.OwnerID))
workspaceAgentsGauge.WithLabelValues(user.Username, workspace.Name).Add(0)
continue
}

agents, err := db.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, build.WorkspaceID)
agents, err := db.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
log.Println("3", err)
logger.Error(ctx, "can't get workspace agents", slog.F("workspace_name", workspace.Name), slog.Error(err))
workspaceAgentsGauge.WithLabelValues(user.Username, workspace.Name).Add(0)
continue
}

if len(agents) == 0 {
logger.Info(ctx, "workspace agents are unavailable", slog.F("workspace_name", workspace.Name))
workspaceAgentsGauge.WithLabelValues(user.Username, workspace.Name).Add(0)
continue
}

// FIXME publish workspace even if no agents

for _, agent := range agents {
connectionStatus := agent.Status(6 * time.Second)

// FIXME AgentInactiveDisconnectTimeout
// ? connection_timeout_seconds
// obok latency lifecycle_state
log.Println("with value " + agent.Name)
agentsConnectionGauge.WithLabelValues(agent.Name, workspace.Name, string(connectionStatus.Status)).Set(1)
// Collect information about agents
workspaceAgentsGauge.WithLabelValues(user.Username, workspace.Name).Add(1)

connectionStatus := agent.Status(agentInactiveDisconnectTimeout)
node := (*coordinator.Load()).Node(agent.ID)

tailnetNode := "unknown"
if node != nil {
log.Println("coordinator")
tailnetNode = node.ID.String()
}

for rawRegion, latency := range node.DERPLatency {
log.Println(rawRegion, latency)
agentsConnectionGauge.WithLabelValues(agent.Name, user.Username, workspace.Name, string(connectionStatus.Status), string(agent.LifecycleState), tailnetNode).Set(1)

regionParts := strings.SplitN(rawRegion, "-", 2)
regionID, err := strconv.Atoi(regionParts[0])
if err != nil {
continue // xerrors.Errorf("convert derp region id %q: %w", rawRegion, err)
}
region, found := derpMap.Regions[regionID]
if !found {
// It's possible that a workspace agent is using an old DERPMap
// and reports regions that do not exist. If that's the case,
// report the region as unknown!
region = &tailcfg.DERPRegion{
RegionID: regionID,
RegionName: fmt.Sprintf("Unnamed %d", regionID),
}
}
if node == nil {
logger.Info(ctx, "can't read in-memory node for agent", slog.F("workspace_name", workspace.Name), slog.F("agent_name", agent.Name))
continue
}

log.Println(region, latency)
agentsUserLatenciesGauge.WithLabelValues(agent.Name, workspace.Name, region.RegionName, fmt.Sprintf("%v", node.PreferredDERP == regionID)).Set(latency)
// Collect information about connection latencies
for rawRegion, latency := range node.DERPLatency {
regionParts := strings.SplitN(rawRegion, "-", 2)
regionID, err := strconv.Atoi(regionParts[0])
if err != nil {
logger.Error(ctx, "can't convert DERP region", slog.Error(err), slog.F("agent_name", agent.Name), slog.F("raw_region", rawRegion))
continue
}
} else {
log.Println("node is null")
region, found := derpMap.Regions[regionID]
if !found {
// It's possible that a workspace agent is using an old DERPMap
// and reports regions that do not exist. If that's the case,
// report the region as unknown!
region = &tailcfg.DERPRegion{
RegionID: regionID,
RegionName: fmt.Sprintf("Unnamed %d", regionID),
}
}

agentsConnectionLatenciesGauge.WithLabelValues(agent.Name, user.Username, workspace.Name, region.RegionName, fmt.Sprintf("%v", node.PreferredDERP == regionID)).Set(latency)
}

// FIXME publish agent even if DERP is missing
// FIXME IDE?
// FIXME agent connection zero
// FIXME connection_type ide
}
}
}
Expand Down