Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: replace GetManagedAgentCount query with aggregate table
- Removes GetManagedAgentCount query
- Adds new table `usage_events_daily` which stores aggregated usage
  events by the type and UTC day
- Adds trigger to update the values in this table when a new row is
  inserted into `usage_events`
- Adds a migration that adds `usage_events_daily` rows for existing data
  in `usage_events`

Since the `usage_events` table is unreleased currently, this migration
will do nothing on real deployments and will only affect preview
deployments such as dogfood.
  • Loading branch information
deansheather committed Aug 29, 2025
commit 4b05f9743635e2e1a9d40802893d0b84cf4e38a8
15 changes: 7 additions & 8 deletions coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -2252,14 +2252,6 @@ func (q *querier) GetLogoURL(ctx context.Context) (string, error) {
return q.db.GetLogoURL(ctx)
}

func (q *querier) GetManagedAgentCount(ctx context.Context, arg database.GetManagedAgentCountParams) (int64, error) {
// Must be able to read all workspaces to check usage.
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceWorkspace); err != nil {
return 0, xerrors.Errorf("authorize read all workspaces: %w", err)
}
return q.db.GetManagedAgentCount(ctx, arg)
}

func (q *querier) GetNotificationMessagesByStatus(ctx context.Context, arg database.GetNotificationMessagesByStatusParams) ([]database.NotificationMessage, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceNotificationMessage); err != nil {
return nil, err
Expand Down Expand Up @@ -3058,6 +3050,13 @@ func (q *querier) GetTemplatesWithFilter(ctx context.Context, arg database.GetTe
return q.db.GetAuthorizedTemplates(ctx, arg, prep)
}

func (q *querier) GetTotalUsageDCManagedAgentsV1(ctx context.Context, arg database.GetTotalUsageDCManagedAgentsV1Params) (int64, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceUsageEvent); err != nil {
return 0, err
}
return q.db.GetTotalUsageDCManagedAgentsV1(ctx, arg)
}

func (q *querier) GetUnexpiredLicenses(ctx context.Context) ([]database.License, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceLicense); err != nil {
return nil, err
Expand Down
14 changes: 8 additions & 6 deletions coderd/database/dbauthz/dbauthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,6 @@ func (s *MethodTestSuite) TestLicense() {
dbm.EXPECT().GetAnnouncementBanners(gomock.Any()).Return("value", nil).AnyTimes()
check.Args().Asserts().Returns("value")
}))
s.Run("GetManagedAgentCount", s.Mocked(func(dbm *dbmock.MockStore, _ *gofakeit.Faker, check *expects) {
start := dbtime.Now()
end := start.Add(time.Hour)
dbm.EXPECT().GetManagedAgentCount(gomock.Any(), database.GetManagedAgentCountParams{StartTime: start, EndTime: end}).Return(int64(0), nil).AnyTimes()
check.Args(database.GetManagedAgentCountParams{StartTime: start, EndTime: end}).Asserts(rbac.ResourceWorkspace, policy.ActionRead).Returns(int64(0))
}))
}

func (s *MethodTestSuite) TestOrganization() {
Expand Down Expand Up @@ -4403,4 +4397,12 @@ func (s *MethodTestSuite) TestUsageEvents() {
db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), params).Return(nil)
check.Args(params).Asserts(rbac.ResourceUsageEvent, policy.ActionUpdate)
}))

s.Run("GetTotalUsageDCManagedAgentsV1", s.Mocked(func(db *dbmock.MockStore, faker *gofakeit.Faker, check *expects) {
db.EXPECT().GetTotalUsageDCManagedAgentsV1(gomock.Any(), gomock.Any()).Return(int64(1), nil)
check.Args(database.GetTotalUsageDCManagedAgentsV1Params{
StartDate: time.Time{},
EndDate: time.Time{},
}).Asserts(rbac.ResourceUsageEvent, policy.ActionRead)
}))
}
14 changes: 7 additions & 7 deletions coderd/database/dbmetrics/querymetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions coderd/database/dbmock/dbmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions coderd/database/dump.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to check migration number before merge

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TRIGGER IF EXISTS trigger_aggregate_usage_event ON usage_events;
DROP FUNCTION IF EXISTS aggregate_usage_event();
DROP TABLE IF EXISTS usage_events_daily;
65 changes: 65 additions & 0 deletions coderd/database/migrations/000362_aggregate_usage_events.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
CREATE TABLE usage_events_daily (
day date NOT NULL, -- always grouped by day in UTC
event_type text NOT NULL,
usage_data jsonb NOT NULL,
PRIMARY KEY (day, event_type)
);

COMMENT ON TABLE usage_events_daily IS 'usage_events_daily is a daily rollup of usage events. It stores the total usage for each event type by day.';
COMMENT ON COLUMN usage_events_daily.day IS 'The date of the summed usage events, always in UTC.';

-- Function to handle usage event aggregation
CREATE OR REPLACE FUNCTION aggregate_usage_event()
RETURNS TRIGGER AS $$
BEGIN
-- Check for supported event types and throw error for unknown types
IF NEW.event_type NOT IN ('dc_managed_agents_v1') THEN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main worry with this is that if some bad code spams insertion of unknown usage events we could end up creating some serious DB load. IIRC you make it fairly difficult to even do that though, so I guess this is OK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine it would generate a lot of load anyway if something like that was happening. This upsert should be fairly quick since it's using the primary key.

I also don't know how else I'd handle this other than a cronjob, which would also generate a lot of load if there was a lot of rows, and would require a new Go package to handle doing it every once in a while.

RAISE EXCEPTION 'Unhandled usage event type in aggregate_usage_event: %', NEW.event_type;
END IF;

INSERT INTO usage_events_daily (day, event_type, usage_data)
VALUES (
-- Extract the date from the created_at timestamp, always using UTC for
-- consistency
date_trunc('day', NEW.created_at AT TIME ZONE 'UTC')::date,
NEW.event_type,
NEW.event_data
)
ON CONFLICT (day, event_type) DO UPDATE SET
usage_data = CASE
-- Handle simple counter events by summing the count
WHEN NEW.event_type IN ('dc_managed_agents_v1') THEN
jsonb_build_object(
'count',
COALESCE((usage_events_daily.usage_data->>'count')::int, 0) +
COALESCE((NEW.event_data->>'count')::int, 0)
)
END;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger to automatically aggregate usage events
CREATE TRIGGER trigger_aggregate_usage_event
AFTER INSERT ON usage_events
FOR EACH ROW
EXECUTE FUNCTION aggregate_usage_event();

-- Populate usage_events_daily with existing data
INSERT INTO
usage_events_daily (day, event_type, usage_data)
SELECT
date_trunc('day', created_at AT TIME ZONE 'UTC')::date AS day,
event_type,
jsonb_build_object('count', SUM((event_data->>'count')::int)) AS usage_data
FROM
usage_events
WHERE
-- The only event type we currently support is dc_managed_agents_v1
event_type = 'dc_managed_agents_v1'
GROUP BY
date_trunc('day', created_at AT TIME ZONE 'UTC')::date,
event_type
ON CONFLICT (day, event_type) DO UPDATE SET
usage_data = EXCLUDED.usage_data;
106 changes: 106 additions & 0 deletions coderd/database/migrations/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import (
"slices"
"sync"
"testing"
"time"

"github.com/golang-migrate/migrate/v4"
migratepostgres "github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/golang-migrate/migrate/v4/source"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/golang-migrate/migrate/v4/source/stub"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/sync/errgroup"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/migrations"
"github.com/coder/coder/v2/testutil"
Expand Down Expand Up @@ -363,3 +366,106 @@ func TestMigrateUpWithFixtures(t *testing.T) {
})
}
}

// TestMigration000362AggregateUsageEvents tests the migration that aggregates
// usage events into daily rows correctly.
func TestMigration000362AggregateUsageEvents(t *testing.T) {
t.Parallel()

const migrationVersion = 362

// Similarly to the other test, this test will probably time out in CI.
ctx := testutil.Context(t, testutil.WaitSuperLong)

sqlDB := testSQLDB(t)
db := database.New(sqlDB)

// Migrate up to the migration before the one that aggregates usage events.
next, err := migrations.Stepper(sqlDB)
require.NoError(t, err)
for {
version, more, err := next()
require.NoError(t, err)
if !more {
t.Fatalf("migration %d not found", migrationVersion)
}
if version == migrationVersion-1 {
break
}
}

locSydney, err := time.LoadLocation("Australia/Sydney")
require.NoError(t, err)

usageEvents := []struct {
// The only possible event type is dc_managed_agents_v1 when this
// migration gets applied.
eventData []byte
createdAt time.Time
}{
{
eventData: []byte(`{"count": 41}`),
createdAt: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
eventData: []byte(`{"count": 1}`),
// 2025-01-01 in UTC
createdAt: time.Date(2025, 1, 2, 8, 38, 57, 0, locSydney),
},
{
eventData: []byte(`{"count": 1}`),
createdAt: time.Date(2025, 1, 2, 0, 0, 0, 0, time.UTC),
},
}
expectedDailyRows := []struct {
day time.Time
usageData []byte
}{
{
day: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
usageData: []byte(`{"count": 42}`),
},
{
day: time.Date(2025, 1, 2, 0, 0, 0, 0, time.UTC),
usageData: []byte(`{"count": 1}`),
},
}

for _, usageEvent := range usageEvents {
err := db.InsertUsageEvent(ctx, database.InsertUsageEventParams{
ID: uuid.New().String(),
EventType: "dc_managed_agents_v1",
EventData: usageEvent.eventData,
CreatedAt: usageEvent.createdAt,
})
require.NoError(t, err)
}

// Migrate up to the migration that aggregates usage events.
version, _, err := next()
require.NoError(t, err)
require.EqualValues(t, migrationVersion, version)

// Get all of the newly created daily rows. This query is not exposed in the
// querier interface intentionally.
rows, err := sqlDB.QueryContext(ctx, "SELECT day, event_type, usage_data FROM usage_events_daily ORDER BY day ASC")
require.NoError(t, err, "perform query")
defer rows.Close()
var out []database.UsageEventsDaily
for rows.Next() {
var row database.UsageEventsDaily
err := rows.Scan(&row.Day, &row.EventType, &row.UsageData)
require.NoError(t, err, "scan row")
out = append(out, row)
}

// Verify that the daily rows match our expectations.
require.Len(t, out, len(expectedDailyRows))
for i, row := range out {
require.Equal(t, "dc_managed_agents_v1", row.EventType)
// The read row might be `+0000` rather than `UTC` specifically, so just
// ensure it's within 1 second of the expected time.
require.WithinDuration(t, expectedDailyRows[i].day, row.Day, time.Second)
require.JSONEq(t, string(expectedDailyRows[i].usageData), string(row.UsageData))
}
}
Loading
Loading