Skip to content

Commit 416f4ed

Browse files
authored
Merge branch 'main' into bryphe/prototype/3-port-ui
2 parents a0a9da2 + 2769f4c commit 416f4ed

File tree

8 files changed

+356
-2
lines changed

8 files changed

+356
-2
lines changed

.github/semantic.yaml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
###############################################################################
2+
# This file configures "Semantic Pull Requests", which is documented here:
3+
# https://github.com/zeke/semantic-pull-requests
4+
#
5+
# This action/spec implements the "Conventional Commits" RFC which is
6+
# available here:
7+
# https://www.notion.so/coderhq/Conventional-commits-1d51287f58b64026bb29393f277734ed
8+
###############################################################################
9+
10+
# We only check that the PR title is semantic. The PR title is automatically
11+
# applied to the "Squash & Merge" flow as the suggested commit message, so this
12+
# should suffice unless someone drastically alters the message in that flow.
13+
titleOnly: true
14+
15+
# Types are the 'tag' types in a commit or PR title. For example, in
16+
#
17+
# chore: fix thing
18+
#
19+
# 'chore' is the type.
20+
types:
21+
# A build of any kind.
22+
- build
23+
24+
# A RELEASED fix that will NOT be back-ported. The originating issue may have
25+
# been discovered internally or externally to Coder.
26+
- fix
27+
28+
# Any code task that is ignored for changelog purposes. Examples include
29+
# devbin scripts and internal-only configurations.
30+
- chore
31+
32+
# Any work performed on CI.
33+
- ci
34+
35+
# An UNRELEASED correction. For example, features are often built
36+
# incrementally and sometimes introduce minor flaws during a release cycle.
37+
# Corrections address those increments and flaws.
38+
- correct
39+
40+
# Work that directly implements or supports the implementation of a feature.
41+
- feat
42+
43+
# A fix for a RELEASED bug (regression fix) that is intended for patch-release
44+
# purposes.
45+
- hotfix
46+
47+
# A refactor changes code structure without any behavioral change.
48+
- refactor
49+
50+
# A git revert for any style of commit.
51+
- revert
52+
53+
# Adding tests of any kind. Should be separate from feature or fix
54+
# implementations. For example, if a commit adds a fix + test, it's a fix
55+
# commit. If a commit is simply bumping coverage, it's a test commit.
56+
- test

.github/workflows/coder.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ jobs:
9393

9494
- run: go install gotest.tools/gotestsum@latest
9595

96-
# Windows is not happy with backslashed commands.
9796
- run:
9897
gotestsum --jsonfile="gotests.json" --packages="./..." --
9998
-covermode=atomic -coverprofile="gotests.coverage"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ This repository contains source code for Coder V2. Additional documentation:
1010
## Directory Structure
1111

1212
- `.github/`: Settings for [Dependabot for updating dependencies](https://docs.github.com/en/code-security/supply-chain-security/customizing-dependency-updates) and [build/deploy pipelines with GitHub Actions](https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions).
13+
- [`semantic.yaml`](./github/semantic.yaml): Configuration for [semantic pull requests](https://github.com/apps/semantic-pull-requests)

database/postgres/postgres_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import (
66
"database/sql"
77
"testing"
88

9-
"github.com/coder/coder/database/postgres"
109
"github.com/stretchr/testify/require"
1110
"go.uber.org/goleak"
1211

12+
"github.com/coder/coder/database/postgres"
13+
1314
_ "github.com/lib/pq"
1415
)
1516

database/pubsub.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"sync"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
"github.com/lib/pq"
12+
"golang.org/x/xerrors"
13+
)
14+
15+
// Listener represents a pubsub handler.
16+
type Listener func(ctx context.Context, message []byte)
17+
18+
// Pubsub is a generic interface for broadcasting and receiving messages.
19+
// Implementors should assume high-availability with the backing implementation.
20+
type Pubsub interface {
21+
Subscribe(event string, listener Listener) (cancel func(), err error)
22+
Publish(event string, message []byte) error
23+
Close() error
24+
}
25+
26+
// Pubsub implementation using PostgreSQL.
27+
type pgPubsub struct {
28+
pgListener *pq.Listener
29+
db *sql.DB
30+
mut sync.Mutex
31+
listeners map[string]map[string]Listener
32+
}
33+
34+
// Subscribe calls the listener when an event matching the name is received.
35+
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
36+
p.mut.Lock()
37+
defer p.mut.Unlock()
38+
39+
err = p.pgListener.Listen(event)
40+
if errors.Is(err, pq.ErrChannelAlreadyOpen) {
41+
// It's ok if it's already open!
42+
err = nil
43+
}
44+
if err != nil {
45+
return nil, xerrors.Errorf("listen: %w", err)
46+
}
47+
48+
var listeners map[string]Listener
49+
var ok bool
50+
if listeners, ok = p.listeners[event]; !ok {
51+
listeners = map[string]Listener{}
52+
p.listeners[event] = listeners
53+
}
54+
var id string
55+
for {
56+
id = uuid.New().String()
57+
if _, ok = listeners[id]; !ok {
58+
break
59+
}
60+
}
61+
listeners[id] = listener
62+
return func() {
63+
p.mut.Lock()
64+
defer p.mut.Unlock()
65+
listeners := p.listeners[event]
66+
delete(listeners, id)
67+
68+
if len(listeners) == 0 {
69+
_ = p.pgListener.Unlisten(event)
70+
}
71+
}, nil
72+
}
73+
74+
func (p *pgPubsub) Publish(event string, message []byte) error {
75+
_, err := p.db.ExecContext(context.Background(), `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message)
76+
if err != nil {
77+
return xerrors.Errorf("exec: %w", err)
78+
}
79+
return nil
80+
}
81+
82+
// Close closes the pubsub instance.
83+
func (p *pgPubsub) Close() error {
84+
return p.pgListener.Close()
85+
}
86+
87+
// listen begins receiving messages on the pq listener.
88+
func (p *pgPubsub) listen(ctx context.Context) {
89+
var (
90+
notif *pq.Notification
91+
ok bool
92+
)
93+
defer p.pgListener.Close()
94+
for {
95+
select {
96+
case <-ctx.Done():
97+
return
98+
case notif, ok = <-p.pgListener.Notify:
99+
if !ok {
100+
return
101+
}
102+
}
103+
// A nil notification can be dispatched on reconnect.
104+
if notif == nil {
105+
continue
106+
}
107+
p.listenReceive(ctx, notif)
108+
}
109+
}
110+
111+
func (p *pgPubsub) listenReceive(ctx context.Context, notif *pq.Notification) {
112+
p.mut.Lock()
113+
defer p.mut.Unlock()
114+
listeners, ok := p.listeners[notif.Channel]
115+
if !ok {
116+
return
117+
}
118+
extra := []byte(notif.Extra)
119+
for _, listener := range listeners {
120+
go listener(ctx, extra)
121+
}
122+
}
123+
124+
// NewPubsub creates a new Pubsub implementation using a PostgreSQL connection.
125+
func NewPubsub(ctx context.Context, db *sql.DB, connectURL string) (Pubsub, error) {
126+
// Creates a new listener using pq.
127+
errCh := make(chan error)
128+
listener := pq.NewListener(connectURL, time.Second*10, time.Minute, func(event pq.ListenerEventType, err error) {
129+
// This callback gets events whenever the connection state changes.
130+
// Don't send if the errChannel has already been closed.
131+
select {
132+
case <-errCh:
133+
return
134+
default:
135+
errCh <- err
136+
close(errCh)
137+
}
138+
})
139+
select {
140+
case err := <-errCh:
141+
if err != nil {
142+
return nil, xerrors.Errorf("create pq listener: %w", err)
143+
}
144+
case <-ctx.Done():
145+
return nil, ctx.Err()
146+
}
147+
pg := &pgPubsub{
148+
db: db,
149+
pgListener: listener,
150+
listeners: make(map[string]map[string]Listener),
151+
}
152+
go pg.listen(ctx)
153+
154+
return pg, nil
155+
}

database/pubsub_memory.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/google/uuid"
8+
)
9+
10+
// memoryPubsub is an in-memory Pubsub implementation.
11+
type memoryPubsub struct {
12+
mut sync.RWMutex
13+
listeners map[string]map[uuid.UUID]Listener
14+
}
15+
16+
func (m *memoryPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
17+
m.mut.Lock()
18+
defer m.mut.Unlock()
19+
20+
var listeners map[uuid.UUID]Listener
21+
var ok bool
22+
if listeners, ok = m.listeners[event]; !ok {
23+
listeners = map[uuid.UUID]Listener{}
24+
m.listeners[event] = listeners
25+
}
26+
var id uuid.UUID
27+
for {
28+
id = uuid.New()
29+
if _, ok = listeners[id]; !ok {
30+
break
31+
}
32+
}
33+
listeners[id] = listener
34+
return func() {
35+
m.mut.Lock()
36+
defer m.mut.Unlock()
37+
listeners := m.listeners[event]
38+
delete(listeners, id)
39+
}, nil
40+
}
41+
42+
func (m *memoryPubsub) Publish(event string, message []byte) error {
43+
m.mut.RLock()
44+
defer m.mut.RUnlock()
45+
listeners, ok := m.listeners[event]
46+
if !ok {
47+
return nil
48+
}
49+
for _, listener := range listeners {
50+
listener(context.Background(), message)
51+
}
52+
return nil
53+
}
54+
55+
func (m *memoryPubsub) Close() error {
56+
return nil
57+
}
58+
59+
func NewPubsubInMemory() Pubsub {
60+
return &memoryPubsub{
61+
listeners: make(map[string]map[uuid.UUID]Listener),
62+
}
63+
}

database/pubsub_memory_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package database_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/coder/coder/database"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestPubsubMemory(t *testing.T) {
13+
t.Parallel()
14+
15+
t.Run("Memory", func(t *testing.T) {
16+
pubsub := database.NewPubsubInMemory()
17+
event := "test"
18+
data := "testing"
19+
ch := make(chan []byte)
20+
cancelFunc, err := pubsub.Subscribe(event, func(ctx context.Context, message []byte) {
21+
ch <- message
22+
})
23+
require.NoError(t, err)
24+
defer cancelFunc()
25+
go func() {
26+
err = pubsub.Publish(event, []byte(data))
27+
require.NoError(t, err)
28+
}()
29+
message := <-ch
30+
assert.Equal(t, string(message), data)
31+
})
32+
}

database/pubsub_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//go:build linux
2+
3+
package database_test
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"testing"
9+
10+
"github.com/coder/coder/database"
11+
"github.com/coder/coder/database/postgres"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestPubsub(t *testing.T) {
17+
t.Parallel()
18+
19+
t.Run("Postgres", func(t *testing.T) {
20+
ctx, cancelFunc := context.WithCancel(context.Background())
21+
defer cancelFunc()
22+
23+
connectionURL, close, err := postgres.Open()
24+
require.NoError(t, err)
25+
defer close()
26+
db, err := sql.Open("postgres", connectionURL)
27+
require.NoError(t, err)
28+
defer db.Close()
29+
pubsub, err := database.NewPubsub(ctx, db, connectionURL)
30+
require.NoError(t, err)
31+
defer pubsub.Close()
32+
event := "test"
33+
data := "testing"
34+
ch := make(chan []byte)
35+
cancelFunc, err = pubsub.Subscribe(event, func(ctx context.Context, message []byte) {
36+
ch <- message
37+
})
38+
require.NoError(t, err)
39+
defer cancelFunc()
40+
go func() {
41+
err = pubsub.Publish(event, []byte(data))
42+
require.NoError(t, err)
43+
}()
44+
message := <-ch
45+
assert.Equal(t, string(message), data)
46+
})
47+
}

0 commit comments

Comments
 (0)