Skip to content

chore: separate pubsub into a new package #8017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix linting errors
  • Loading branch information
kylecarbs committed Jun 13, 2023
commit 936cbe70f9669e6a0ff8bc9c6b1a4b35aa7a89d9
8 changes: 4 additions & 4 deletions coderd/database/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ func TestPubsub_ordering(t *testing.T) {
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
pubsub, err := pubsub.New(ctx, db, connectionURL)
ps, err := pubsub.New(ctx, db, connectionURL)
require.NoError(t, err)
defer pubsub.Close()
defer ps.Close()
event := "test"
messageChannel := make(chan []byte, 100)
cancelSub, err := pubsub.Subscribe(event, func(ctx context.Context, message []byte) {
cancelSub, err := ps.Subscribe(event, func(ctx context.Context, message []byte) {
// sleep a random amount of time to simulate handlers taking different amount of time
// to process, depending on the message
// nolint: gosec
Expand All @@ -187,7 +187,7 @@ func TestPubsub_ordering(t *testing.T) {
require.NoError(t, err)
defer cancelSub()
for i := 0; i < 100; i++ {
err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", i)))
err = ps.Publish(event, []byte(fmt.Sprintf("%d", i)))
assert.NoError(t, err)
}
for i := 0; i < 100; i++ {
Expand Down
8 changes: 4 additions & 4 deletions coderd/provisionerdserver/provisionerdserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func TestAcquireJob(t *testing.T) {
t.Run("Debounce", func(t *testing.T) {
t.Parallel()
db := dbfake.New()
pubsub := pubsub.NewInMemory()
ps := pubsub.NewInMemory()
srv := &provisionerdserver.Server{
ID: uuid.New(),
Logger: slogtest.Make(t, nil),
AccessURL: &url.URL{},
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
Database: db,
Pubsub: pubsub,
Pubsub: ps,
Telemetry: telemetry.NewNoop(),
AcquireJobDebounce: time.Hour,
Auditor: mockAuditor(),
Expand Down Expand Up @@ -1257,7 +1257,7 @@ func TestInsertWorkspaceResource(t *testing.T) {
func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server {
t.Helper()
db := dbfake.New()
pubsub := pubsub.NewInMemory()
ps := pubsub.NewInMemory()

return &provisionerdserver.Server{
ID: uuid.New(),
Expand All @@ -1266,7 +1266,7 @@ func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server {
AccessURL: &url.URL{},
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
Database: db,
Pubsub: pubsub,
Pubsub: ps,
Telemetry: telemetry.NewNoop(),
Auditor: mockAuditor(),
TemplateScheduleStore: testTemplateScheduleStore(),
Expand Down
4 changes: 2 additions & 2 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@ type logFollower struct {
}

func newLogFollower(
ctx context.Context, logger slog.Logger, db database.Store, pubsub pubsub.Pubsub,
ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub,
rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64,
) *logFollower {
return &logFollower{
ctx: ctx,
logger: logger,
db: db,
pubsub: pubsub,
pubsub: ps,
r: r,
rw: rw,
jobID: job.ID,
Expand Down
16 changes: 8 additions & 8 deletions coderd/provisionerjobs_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func Test_logFollower_completeBeforeFollow(t *testing.T) {
logger := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
mDB := dbmock.NewMockStore(ctrl)
pubsub := pubsub.NewInMemory()
ps := pubsub.NewInMemory()
now := database.Now()
job := database.ProvisionerJob{
ID: uuid.New(),
Expand All @@ -158,7 +158,7 @@ func Test_logFollower_completeBeforeFollow(t *testing.T) {

// we need an HTTP server to get a websocket
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 10)
uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 10)
uut.follow()
}))
defer srv.Close()
Expand Down Expand Up @@ -201,7 +201,7 @@ func Test_logFollower_completeBeforeSubscribe(t *testing.T) {
logger := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
mDB := dbmock.NewMockStore(ctrl)
pubsub := pubsub.NewInMemory()
ps := pubsub.NewInMemory()
now := database.Now()
job := database.ProvisionerJob{
ID: uuid.New(),
Expand All @@ -218,7 +218,7 @@ func Test_logFollower_completeBeforeSubscribe(t *testing.T) {

// we need an HTTP server to get a websocket
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 0)
uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 0)
uut.follow()
}))
defer srv.Close()
Expand Down Expand Up @@ -277,7 +277,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) {
logger := slogtest.Make(t, nil)
ctrl := gomock.NewController(t)
mDB := dbmock.NewMockStore(ctrl)
pubsub := pubsub.NewInMemory()
ps := pubsub.NewInMemory()
now := database.Now()
job := database.ProvisionerJob{
ID: uuid.New(),
Expand All @@ -294,7 +294,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) {

// we need an HTTP server to get a websocket
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 0)
uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 0)
uut.follow()
}))
defer srv.Close()
Expand Down Expand Up @@ -343,7 +343,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) {
}
msg, err = json.Marshal(&n)
require.NoError(t, err)
err = pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg)
err = ps.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg)
require.NoError(t, err)

mt, msg, err = client.Read(ctx)
Expand All @@ -361,7 +361,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) {
n.CreatedAfter = 0
msg, err = json.Marshal(&n)
require.NoError(t, err)
err = pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg)
err = ps.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg)
require.NoError(t, err)

// server should now close
Expand Down
4 changes: 2 additions & 2 deletions enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// NewCoordinator creates a new high availability coordinator
// that uses PostgreSQL pubsub to exchange handshakes.
func NewCoordinator(logger slog.Logger, pubsub pubsub.Pubsub) (agpl.Coordinator, error) {
func NewCoordinator(logger slog.Logger, ps pubsub.Pubsub) (agpl.Coordinator, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

nameCache, err := lru.New[uuid.UUID, string](512)
Expand All @@ -33,7 +33,7 @@ func NewCoordinator(logger slog.Logger, pubsub pubsub.Pubsub) (agpl.Coordinator,
coord := &haCoordinator{
id: uuid.New(),
log: logger,
pubsub: pubsub,
pubsub: ps,
closeFunc: cancelFunc,
close: make(chan struct{}),
nodes: map[uuid.UUID]*agpl.Node{},
Expand Down