Skip to content

Commit 54cbfaf

Browse files
committed
add pubsub to dbmem
1 parent 22ece36 commit 54cbfaf

File tree

6 files changed

+65
-58
lines changed

6 files changed

+65
-58
lines changed

cli/server.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -580,14 +580,15 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
580580
return xerrors.Errorf("parse ssh config options %q: %w", vals.SSHConfig.SSHConfigOptions.String(), err)
581581
}
582582

583+
db, ps := dbmem.NewWithPubsub()
583584
options := &coderd.Options{
584585
AccessURL: vals.AccessURL.Value(),
585586
AppHostname: appHostname,
586587
AppHostnameRegex: appHostnameRegex,
587588
Logger: logger.Named("coderd"),
588-
Database: dbmem.New(),
589+
Database: db,
589590
BaseDERPMap: derpMap,
590-
Pubsub: pubsub.NewInMemory(),
591+
Pubsub: ps,
591592
CacheDir: cacheDir,
592593
GoogleTokenValidator: googleTokenValidator,
593594
ExternalAuthConfigs: externalAuthConfigs,
@@ -691,8 +692,9 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
691692
var pubsubWatchdogTimeout <-chan struct{}
692693
if vals.InMemoryDatabase {
693694
// This is only used for testing.
694-
options.Database = dbmem.New()
695-
options.Pubsub = pubsub.NewInMemory()
695+
db, ps := dbmem.NewWithPubsub()
696+
options.Pubsub = ps
697+
options.Database = db
696698
} else {
697699
sqlDB, dbURL, err := getPostgresDB(ctx, logger, vals.PostgresURL.String(), codersdk.PostgresAuth(vals.PostgresAuth), sqlDriver)
698700
if err != nil {

coderd/database/dbmem/dbmem.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/coder/coder/v2/coderd/database"
2828
"github.com/coder/coder/v2/coderd/database/dbtime"
29+
"github.com/coder/coder/v2/coderd/database/pubsub"
2930
"github.com/coder/coder/v2/coderd/rbac"
3031
"github.com/coder/coder/v2/coderd/rbac/regosql"
3132
"github.com/coder/coder/v2/coderd/util/slice"
@@ -51,6 +52,17 @@ var (
5152

5253
// New returns an in-memory fake of the database.
5354
func New() database.Store {
55+
return newDbMem()
56+
}
57+
58+
func NewWithPubsub() (database.Store, pubsub.Pubsub) {
59+
q := newDbMem()
60+
ps := pubsub.NewInMemory()
61+
q.ps = ps
62+
return q, ps
63+
}
64+
65+
func newDbMem() *FakeQuerier {
5466
q := &FakeQuerier{
5567
mutex: &sync.RWMutex{},
5668
data: &data{
@@ -167,6 +179,7 @@ func (inTxMutex) RUnlock() {}
167179
// can do type checks.
168180
type FakeQuerier struct {
169181
mutex rwMutex
182+
ps pubsub.Pubsub
170183
*data
171184
}
172185

@@ -7776,7 +7789,7 @@ func (q *FakeQuerier) InsertWorkspace(_ context.Context, arg database.InsertWork
77767789
return workspace, nil
77777790
}
77787791

7779-
func (q *FakeQuerier) InsertWorkspaceAgent(_ context.Context, arg database.InsertWorkspaceAgentParams) (database.WorkspaceAgent, error) {
7792+
func (q *FakeQuerier) InsertWorkspaceAgent(ctx context.Context, arg database.InsertWorkspaceAgentParams) (database.WorkspaceAgent, error) {
77807793
if err := validateDatabaseType(arg); err != nil {
77817794
return database.WorkspaceAgent{}, err
77827795
}
@@ -7807,6 +7820,22 @@ func (q *FakeQuerier) InsertWorkspaceAgent(_ context.Context, arg database.Inser
78077820
}
78087821

78097822
q.workspaceAgents = append(q.workspaceAgents, agent)
7823+
if q.ps != nil {
7824+
workspace, err := q.getWorkspaceByAgentIDNoLock(ctx, agent.ID)
7825+
// Agents might not belong to a workspace (template imports)
7826+
if err != nil {
7827+
return agent, nil
7828+
}
7829+
msg, err := json.Marshal(codersdk.WorkspaceEvent{
7830+
Kind: codersdk.WorkspaceEventKindNewAgent,
7831+
WorkspaceID: workspace.ID,
7832+
AgentID: &agent.ID,
7833+
})
7834+
if err != nil {
7835+
return database.WorkspaceAgent{}, xerrors.Errorf("failed to marshal workspace event: %w", err)
7836+
}
7837+
_ = q.ps.Publish(codersdk.WorkspaceEventChannel(workspace.OwnerID), msg)
7838+
}
78107839
return agent, nil
78117840
}
78127841

coderd/database/dbtestutil/db.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
9595
opt(&o)
9696
}
9797

98-
db := dbmem.New()
99-
ps := pubsub.NewInMemory()
98+
db, ps := dbmem.NewWithPubsub()
10099
if WillUsePostgres() {
101100
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
102101
if connectionURL == "" && o.url != "" {

coderd/database/dump.sql

Lines changed: 14 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
DROP TYPE agent_id_name_pair;
22

3-
DROP TRIGGER IF EXISTS new_workspace_notify ON workspaces;
4-
DROP FUNCTION IF EXISTS new_workspace_notify;
5-
63
DROP TRIGGER IF EXISTS new_agent_notify ON workspace_agents;
74
DROP FUNCTION IF EXISTS new_agent_notify;

coderd/database/migrations/000260_workspace_updates.up.sql

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,15 @@ CREATE TYPE agent_id_name_pair AS (
33
name text
44
);
55

6-
CREATE FUNCTION new_workspace_notify() RETURNS trigger
7-
LANGUAGE plpgsql
8-
AS $$
9-
DECLARE
10-
BEGIN
11-
-- Notify for new workspaces & ownership transfers
12-
IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND NEW.owner_id <> OLD.owner_id) THEN
13-
-- Write to the notification channel `new_workspace:owner_id`
14-
-- with the workspace id as the payload.
15-
PERFORM pg_notify('new_workspace:' || NEW.owner_id, NEW.id::text);
16-
END IF;
17-
RETURN NEW;
18-
END;
19-
$$;
20-
21-
CREATE TRIGGER new_workspace_notify
22-
AFTER INSERT OR UPDATE ON workspaces
23-
FOR EACH ROW
24-
EXECUTE FUNCTION new_workspace_notify();
25-
26-
276
CREATE FUNCTION new_agent_notify() RETURNS trigger
287
LANGUAGE plpgsql
298
AS $$
309
DECLARE
31-
workspace_owner_id uuid;
10+
v_workspace_id uuid;
11+
v_workspace_owner_id uuid;
3212
BEGIN
33-
SELECT workspaces.owner_id
34-
INTO workspace_owner_id
13+
SELECT workspaces.owner_id, workspaces.id
14+
INTO v_workspace_owner_id, v_workspace_id
3515
FROM
3616
workspaces
3717
WHERE
@@ -58,9 +38,17 @@ BEGIN
5838
)
5939
);
6040
-- Agents might not belong to a workspace (template imports)
61-
IF workspace_owner_id IS NOT NULL THEN
41+
IF v_workspace_owner_id IS NOT NULL THEN
6242
-- Write to the notification channel `new_agent:workspace_owner_id`
63-
PERFORM pg_notify('new_agent:' || workspace_owner_id, '');
43+
PERFORM pg_notify(
44+
'workspace_owner:' || v_workspace_owner_id,
45+
json_build_object(
46+
'kind', 'new_agent',
47+
'workspace_id', v_workspace_id,
48+
'agent_id', NEW.id
49+
'agent_name', NEW.name
50+
)::text
51+
);
6452
END IF;
6553
RETURN NEW;
6654
END;

0 commit comments

Comments
 (0)