Skip to content

fix: make handleManifest always signal dependents #13141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,56 +807,48 @@ func (a *agent) run() (retErr error) {
// coordination <--------------------------+
// derp map subscriber <----------------+
// stats report loop <---------------+
networkOK := make(chan struct{})
manifestOK := make(chan struct{})
networkOK := newCheckpoint(a.logger)
manifestOK := newCheckpoint(a.logger)

connMan.start("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))

connMan.start("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
select {
case <-ctx.Done():
return nil
case <-manifestOK:
manifest := a.manifest.Load()
NewWorkspaceAppHealthReporter(
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
)(ctx)
return nil
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
manifest := a.manifest.Load()
NewWorkspaceAppHealthReporter(
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
)(ctx)
return nil
})

connMan.start("create or update network", gracefulShutdownBehaviorStop,
a.createOrUpdateNetwork(manifestOK, networkOK))

connMan.start("coordination", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
select {
case <-ctx.Done():
return nil
case <-networkOK:
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.runCoordinator(ctx, conn, a.network)
},
)

connMan.start("derp map subscriber", gracefulShutdownBehaviorStop,
func(ctx context.Context, conn drpc.Conn) error {
select {
case <-ctx.Done():
return nil
case <-networkOK:
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.runDERPMapSubscriber(ctx, conn, a.network)
})

connMan.start("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)

connMan.start("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, conn drpc.Conn) error {
select {
case <-ctx.Done():
return nil
case <-networkOK:
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
return a.statsReporter.reportLoop(ctx, proto.NewDRPCAgentClient(conn))
})
Expand All @@ -865,8 +857,17 @@ func (a *agent) run() (retErr error) {
}

// handleManifest returns a function that fetches and processes the manifest
func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Context, conn drpc.Conn) error {
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, conn drpc.Conn) error {
return func(ctx context.Context, conn drpc.Conn) error {
var (
sentResult = false
err error
)
defer func() {
if !sentResult {
manifestOK.complete(err)
}
}()
aAPI := proto.NewDRPCAgentClient(conn)
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
if err != nil {
Expand Down Expand Up @@ -907,7 +908,8 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont
}

oldManifest := a.manifest.Swap(&manifest)
close(manifestOK)
manifestOK.complete(nil)
sentResult = true

// The startup script should only execute on the first run!
if oldManifest == nil {
Expand Down Expand Up @@ -968,14 +970,15 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont

// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
// the tailnet using the information in the manifest
func (a *agent) createOrUpdateNetwork(manifestOK <-chan struct{}, networkOK chan<- struct{}) func(context.Context, drpc.Conn) error {
return func(ctx context.Context, _ drpc.Conn) error {
select {
case <-ctx.Done():
return nil
case <-manifestOK:
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, drpc.Conn) error {
return func(ctx context.Context, _ drpc.Conn) (retErr error) {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
var err error
defer func() {
networkOK.complete(retErr)
}()
manifest := a.manifest.Load()
a.closeMutex.Lock()
network := a.network
Expand Down Expand Up @@ -1011,7 +1014,6 @@ func (a *agent) createOrUpdateNetwork(manifestOK <-chan struct{}, networkOK chan
network.SetDERPForceWebSockets(manifest.DERPForceWebSockets)
network.SetBlockEndpoints(manifest.DisableDirectConnections)
}
close(networkOK)
return nil
}
}
Expand Down
51 changes: 51 additions & 0 deletions agent/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package agent

import (
"context"
"runtime"
"sync"

"cdr.dev/slog"
)

// checkpoint allows a goroutine to communicate when it is OK to proceed beyond some async condition
// to other dependent goroutines.
type checkpoint struct {
logger slog.Logger
mu sync.Mutex
called bool
done chan struct{}
err error
}

// complete the checkpoint. Pass nil to indicate the checkpoint was ok. It is an error to call this
// more than once.
func (c *checkpoint) complete(err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.called {
b := make([]byte, 2048)
n := runtime.Stack(b, false)
c.logger.Critical(context.Background(), "checkpoint complete called more than once", slog.F("stacktrace", b[:n]))
return
}
c.called = true
c.err = err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using sync once here to simplify usage an allowing multiple calls to complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That introduces the possibility of multiple calls racing success vs failure. Better to ensure that there is only one call to complete().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way would it be racy? Sync once is atomic AFAIK and first come-first served, the other remains blocked waiting for the first to complete.

Copy link
Member

@mafredri mafredri May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps another way to put it: I think it would be preferable for this to be safe to use incorrectly and document how it should be used, vs incorrect use (or carelessness) resulting in a runtime panic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's racy in the sense that if you have two callers, one saying "success" and one saying "failure", then they can race each other to determine which gets reported to things waiting on the checkpoint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand your concern but I have a hard time understanding how it's relevant here. If someone introduces that race right now the program will panic instead. So it needs to be avoided in either case, i.e. a non-issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be preferable for this to be safe to use incorrectly and document how it should be used, vs incorrect use (or carelessness) resulting in a runtime panic.

I think that kind of defensive programming is a disservice in this case. Silently absorbing incorrect/careless use in a non-deterministic way is a source of frustrating and hard to diagnose bugs.

There is a genuine case to be made for a central service like Coderd needing to avoid panicking because of programming bugs, but here in the Agent, I think it's preferable to panic, print a stack trace, and exit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can definitely see the benefit of that approach as well, but I do feel we should have a high tolerance towards introducing sharp edges that can result in decreased developer flow. If we feel this is important enough to panic, wouldn’t another approach be to detect a second call and log it as an error + stack trace?

We don’t know how well the logs from a panic will be persisted either, the user may attempt to rebuild their workspace and ultimately we caused an inconvenience and are never the wiser.

close(c.done)
}

func (c *checkpoint) wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.done:
return c.err
}
}

func newCheckpoint(logger slog.Logger) *checkpoint {
return &checkpoint{
logger: logger,
done: make(chan struct{}),
}
}
49 changes: 49 additions & 0 deletions agent/checkpoint_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package agent

import (
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/testutil"
)

func TestCheckpoint_CompleteWait(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil)
ctx := testutil.Context(t, testutil.WaitShort)
uut := newCheckpoint(logger)
err := xerrors.New("test")
uut.complete(err)
got := uut.wait(ctx)
require.Equal(t, err, got)
}

func TestCheckpoint_CompleteTwice(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
ctx := testutil.Context(t, testutil.WaitShort)
uut := newCheckpoint(logger)
err := xerrors.New("test")
uut.complete(err)
uut.complete(nil) // drops CRITICAL log
got := uut.wait(ctx)
require.Equal(t, err, got)
}

func TestCheckpoint_WaitComplete(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil)
ctx := testutil.Context(t, testutil.WaitShort)
uut := newCheckpoint(logger)
err := xerrors.New("test")
errCh := make(chan error, 1)
go func() {
errCh <- uut.wait(ctx)
}()
uut.complete(err)
got := testutil.RequireRecvCtx(ctx, t, errCh)
require.Equal(t, err, got)
}
Loading