Skip to content

Commit b5f7529

Browse files
committed
add pubsub to dbmem
1 parent 6e5d046 commit b5f7529

File tree

6 files changed

+66
-58
lines changed

6 files changed

+66
-58
lines changed

cli/server.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -580,14 +580,15 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
580580
return xerrors.Errorf("parse ssh config options %q: %w", vals.SSHConfig.SSHConfigOptions.String(), err)
581581
}
582582

583+
db, ps := dbmem.NewWithPubsub()
583584
options := &coderd.Options{
584585
AccessURL: vals.AccessURL.Value(),
585586
AppHostname: appHostname,
586587
AppHostnameRegex: appHostnameRegex,
587588
Logger: logger.Named("coderd"),
588-
Database: dbmem.New(),
589+
Database: db,
589590
BaseDERPMap: derpMap,
590-
Pubsub: pubsub.NewInMemory(),
591+
Pubsub: ps,
591592
CacheDir: cacheDir,
592593
GoogleTokenValidator: googleTokenValidator,
593594
ExternalAuthConfigs: externalAuthConfigs,
@@ -691,8 +692,9 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
691692
var pubsubWatchdogTimeout <-chan struct{}
692693
if vals.InMemoryDatabase {
693694
// This is only used for testing.
694-
options.Database = dbmem.New()
695-
options.Pubsub = pubsub.NewInMemory()
695+
db, ps := dbmem.NewWithPubsub()
696+
options.Pubsub = ps
697+
options.Database = db
696698
} else {
697699
sqlDB, dbURL, err := getPostgresDB(ctx, logger, vals.PostgresURL.String(), codersdk.PostgresAuth(vals.PostgresAuth), sqlDriver)
698700
if err != nil {

coderd/database/dbmem/dbmem.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"golang.org/x/xerrors"
2424

2525
"github.com/coder/coder/v2/coderd/notifications/types"
26+
"github.com/coder/coder/v2/coderd/wspubsub"
2627

2728
"github.com/coder/coder/v2/coderd/database"
2829
"github.com/coder/coder/v2/coderd/database/dbtime"
30+
"github.com/coder/coder/v2/coderd/database/pubsub"
2931
"github.com/coder/coder/v2/coderd/rbac"
3032
"github.com/coder/coder/v2/coderd/rbac/regosql"
3133
"github.com/coder/coder/v2/coderd/util/slice"
@@ -51,6 +53,17 @@ var (
5153

5254
// New returns an in-memory fake of the database.
5355
func New() database.Store {
56+
return newDbMem()
57+
}
58+
59+
func NewWithPubsub() (database.Store, pubsub.Pubsub) {
60+
q := newDbMem()
61+
ps := pubsub.NewInMemory()
62+
q.ps = ps
63+
return q, ps
64+
}
65+
66+
func newDbMem() *FakeQuerier {
5467
q := &FakeQuerier{
5568
mutex: &sync.RWMutex{},
5669
data: &data{
@@ -167,6 +180,7 @@ func (inTxMutex) RUnlock() {}
167180
// can do type checks.
168181
type FakeQuerier struct {
169182
mutex rwMutex
183+
ps pubsub.Pubsub
170184
*data
171185
}
172186

@@ -7776,7 +7790,7 @@ func (q *FakeQuerier) InsertWorkspace(_ context.Context, arg database.InsertWork
77767790
return workspace, nil
77777791
}
77787792

7779-
func (q *FakeQuerier) InsertWorkspaceAgent(_ context.Context, arg database.InsertWorkspaceAgentParams) (database.WorkspaceAgent, error) {
7793+
func (q *FakeQuerier) InsertWorkspaceAgent(ctx context.Context, arg database.InsertWorkspaceAgentParams) (database.WorkspaceAgent, error) {
77807794
if err := validateDatabaseType(arg); err != nil {
77817795
return database.WorkspaceAgent{}, err
77827796
}
@@ -7807,6 +7821,22 @@ func (q *FakeQuerier) InsertWorkspaceAgent(_ context.Context, arg database.Inser
78077821
}
78087822

78097823
q.workspaceAgents = append(q.workspaceAgents, agent)
7824+
if q.ps != nil {
7825+
workspace, err := q.getWorkspaceByAgentIDNoLock(ctx, agent.ID)
7826+
// Agents might not belong to a workspace (template imports)
7827+
if err != nil {
7828+
return agent, nil
7829+
}
7830+
msg, err := json.Marshal(wspubsub.WorkspaceEvent{
7831+
Kind: wspubsub.WorkspaceEventKindNewAgent,
7832+
WorkspaceID: workspace.ID,
7833+
AgentID: &agent.ID,
7834+
})
7835+
if err != nil {
7836+
return database.WorkspaceAgent{}, xerrors.Errorf("failed to marshal workspace event: %w", err)
7837+
}
7838+
_ = q.ps.Publish(wspubsub.WorkspaceEventChannel(workspace.OwnerID), msg)
7839+
}
78107840
return agent, nil
78117841
}
78127842

coderd/database/dbtestutil/db.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
9595
opt(&o)
9696
}
9797

98-
db := dbmem.New()
99-
ps := pubsub.NewInMemory()
98+
db, ps := dbmem.NewWithPubsub()
10099
if WillUsePostgres() {
101100
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
102101
if connectionURL == "" && o.url != "" {

coderd/database/dump.sql

+14-22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
DROP TYPE agent_id_name_pair;
22

3-
DROP TRIGGER IF EXISTS new_workspace_notify ON workspaces;
4-
DROP FUNCTION IF EXISTS new_workspace_notify;
5-
63
DROP TRIGGER IF EXISTS new_agent_notify ON workspace_agents;
74
DROP FUNCTION IF EXISTS new_agent_notify;

coderd/database/migrations/000260_workspace_updates.up.sql

+14-26
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,15 @@ CREATE TYPE agent_id_name_pair AS (
33
name text
44
);
55

6-
CREATE FUNCTION new_workspace_notify() RETURNS trigger
7-
LANGUAGE plpgsql
8-
AS $$
9-
DECLARE
10-
BEGIN
11-
-- Notify for new workspaces & ownership transfers
12-
IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND NEW.owner_id <> OLD.owner_id) THEN
13-
-- Write to the notification channel `new_workspace:owner_id`
14-
-- with the workspace id as the payload.
15-
PERFORM pg_notify('new_workspace:' || NEW.owner_id, NEW.id::text);
16-
END IF;
17-
RETURN NEW;
18-
END;
19-
$$;
20-
21-
CREATE TRIGGER new_workspace_notify
22-
AFTER INSERT OR UPDATE ON workspaces
23-
FOR EACH ROW
24-
EXECUTE FUNCTION new_workspace_notify();
25-
26-
276
CREATE FUNCTION new_agent_notify() RETURNS trigger
287
LANGUAGE plpgsql
298
AS $$
309
DECLARE
31-
workspace_owner_id uuid;
10+
v_workspace_id uuid;
11+
v_workspace_owner_id uuid;
3212
BEGIN
33-
SELECT workspaces.owner_id
34-
INTO workspace_owner_id
13+
SELECT workspaces.owner_id, workspaces.id
14+
INTO v_workspace_owner_id, v_workspace_id
3515
FROM
3616
workspaces
3717
WHERE
@@ -58,9 +38,17 @@ BEGIN
5838
)
5939
);
6040
-- Agents might not belong to a workspace (template imports)
61-
IF workspace_owner_id IS NOT NULL THEN
41+
IF v_workspace_owner_id IS NOT NULL THEN
6242
-- Write to the notification channel `new_agent:workspace_owner_id`
63-
PERFORM pg_notify('new_agent:' || workspace_owner_id, '');
43+
PERFORM pg_notify(
44+
'workspace_owner:' || v_workspace_owner_id,
45+
json_build_object(
46+
'kind', 'new_agent',
47+
'workspace_id', v_workspace_id,
48+
'agent_id', NEW.id,
49+
'agent_name', NEW.name
50+
)::text
51+
);
6452
END IF;
6553
RETURN NEW;
6654
END;

0 commit comments

Comments
 (0)