Skip to content

Commit a7ee8b3

Browse files
authored
fix: Don't use StatusAbnormalClosure (#4155)
1 parent 9e099b5 commit a7ee8b3

17 files changed

+62
-34
lines changed

agent/agent_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ func TestAgent(t *testing.T) {
490490
require.Eventually(t, func() bool {
491491
_, err := conn.Ping()
492492
return err == nil
493-
}, testutil.WaitMedium, testutil.IntervalFast)
493+
}, testutil.WaitLong, testutil.IntervalFast)
494494
})
495495

496496
t.Run("Speedtest", func(t *testing.T) {

cli/cliui/provisionerjob.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Clie
2222
build, err := client.WorkspaceBuild(ctx, build)
2323
return build.Job, err
2424
},
25-
Logs: func() (<-chan codersdk.ProvisionerJobLog, error) {
25+
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
2626
return client.WorkspaceBuildLogsAfter(ctx, build, before)
2727
},
2828
})
@@ -31,7 +31,7 @@ func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Clie
3131
type ProvisionerJobOptions struct {
3232
Fetch func() (codersdk.ProvisionerJob, error)
3333
Cancel func() error
34-
Logs func() (<-chan codersdk.ProvisionerJobLog, error)
34+
Logs func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error)
3535

3636
FetchInterval time.Duration
3737
// Verbose determines whether debug and trace logs will be shown.
@@ -132,10 +132,11 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
132132
// The initial stage needs to print after the signal handler has been registered.
133133
printStage()
134134

135-
logs, err := opts.Logs()
135+
logs, closer, err := opts.Logs()
136136
if err != nil {
137137
return xerrors.Errorf("logs: %w", err)
138138
}
139+
defer closer.Close()
139140

140141
var (
141142
// logOutput is where log output is written

cli/cliui/provisionerjob_test.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cliui_test
22

33
import (
44
"context"
5+
"io"
56
"os"
67
"runtime"
78
"sync"
@@ -136,8 +137,10 @@ func newProvisionerJob(t *testing.T) provisionerJobTest {
136137
Cancel: func() error {
137138
return nil
138139
},
139-
Logs: func() (<-chan codersdk.ProvisionerJobLog, error) {
140-
return logs, nil
140+
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
141+
return logs, closeFunc(func() error {
142+
return nil
143+
}), nil
141144
},
142145
})
143146
},
@@ -164,3 +167,9 @@ func newProvisionerJob(t *testing.T) provisionerJobTest {
164167
PTY: ptty,
165168
}
166169
}
170+
171+
type closeFunc func() error
172+
173+
func (c closeFunc) Close() error {
174+
return c()
175+
}

cli/create.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cli
22

33
import (
44
"fmt"
5+
"io"
56
"time"
67

78
"github.com/spf13/cobra"
@@ -253,7 +254,7 @@ PromptParamLoop:
253254
Cancel: func() error {
254255
return client.CancelTemplateVersionDryRun(cmd.Context(), templateVersion.ID, dryRun.ID)
255256
},
256-
Logs: func() (<-chan codersdk.ProvisionerJobLog, error) {
257+
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
257258
return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, after)
258259
},
259260
// Don't show log output for the dry-run unless there's an error.

cli/templatecreate.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cli
22

33
import (
44
"fmt"
5+
"io"
56
"os"
67
"path/filepath"
78
"strings"
@@ -182,7 +183,7 @@ func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVers
182183
Cancel: func() error {
183184
return client.CancelTemplateVersion(cmd.Context(), version.ID)
184185
},
185-
Logs: func() (<-chan codersdk.ProvisionerJobLog, error) {
186+
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
186187
return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, before)
187188
},
188189
})

cli/update.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ func update() *cobra.Command {
6666
if err != nil {
6767
return err
6868
}
69-
logs, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before)
69+
logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before)
7070
if err != nil {
7171
return err
7272
}
73+
defer closer.Close()
7374
for {
7475
log, ok := <-logs
7576
if !ok {

cmd/cliui/main.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"os"
89
"strings"
910
"time"
@@ -100,7 +101,7 @@ func main() {
100101
Fetch: func() (codersdk.ProvisionerJob, error) {
101102
return job, nil
102103
},
103-
Logs: func() (<-chan codersdk.ProvisionerJobLog, error) {
104+
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
104105
logs := make(chan codersdk.ProvisionerJobLog)
105106
go func() {
106107
defer close(logs)
@@ -143,7 +144,7 @@ func main() {
143144
}
144145
}
145146
}()
146-
return logs, nil
147+
return logs, io.NopCloser(strings.NewReader("")), nil
147148
},
148149
Cancel: func() error {
149150
job.Status = codersdk.ProvisionerJobCanceling

coderd/provisionerjobs_internal_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ func TestProvisionerJobLogs_Unit(t *testing.T) {
108108
require.NoError(t, err)
109109
}
110110

111-
logs, err := client.WorkspaceBuildLogsAfter(ctx, buildID, time.Now())
111+
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, time.Now())
112112
require.NoError(t, err)
113+
defer closer.Close()
113114

114115
// when the endpoint calls subscribe, we get the listener here.
115116
fPubsub.cond.L.Lock()

coderd/provisionerjobs_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ func TestProvisionerJobLogs(t *testing.T) {
4444
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
4545
defer cancel()
4646

47-
logs, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
47+
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
4848
require.NoError(t, err)
49+
defer closer.Close()
4950
for {
5051
log, ok := <-logs
5152
t.Logf("got log: [%s] %s %s", log.Level, log.Stage, log.Output)
@@ -82,8 +83,9 @@ func TestProvisionerJobLogs(t *testing.T) {
8283
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
8384
defer cancel()
8485

85-
logs, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
86+
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before)
8687
require.NoError(t, err)
88+
defer closer.Close()
8789
for {
8890
_, ok := <-logs
8991
if !ok {

coderd/templateversions_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,9 @@ func TestTemplateVersionLogs(t *testing.T) {
447447
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
448448
defer cancel()
449449

450-
logs, err := client.TemplateVersionLogsAfter(ctx, version.ID, before)
450+
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, before)
451451
require.NoError(t, err)
452+
defer closer.Close()
452453
for {
453454
_, ok := <-logs
454455
if !ok {
@@ -618,8 +619,9 @@ func TestTemplateVersionDryRun(t *testing.T) {
618619
require.Equal(t, job.ID, newJob.ID)
619620

620621
// Stream logs
621-
logs, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, after)
622+
logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, after)
622623
require.NoError(t, err)
624+
defer closer.Close()
623625

624626
logsDone := make(chan struct{})
625627
go func() {

coderd/workspaceagents.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
347347

348348
err = updateConnectionTimes()
349349
if err != nil {
350-
_ = conn.Close(websocket.StatusAbnormalClosure, err.Error())
350+
_ = conn.Close(websocket.StatusGoingAway, err.Error())
351351
return
352352
}
353353

@@ -380,7 +380,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
380380
}
381381
err = updateConnectionTimes()
382382
if err != nil {
383-
_ = conn.Close(websocket.StatusAbnormalClosure, err.Error())
383+
_ = conn.Close(websocket.StatusGoingAway, err.Error())
384384
return
385385
}
386386
err := ensureLatestBuild()
@@ -571,7 +571,7 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
571571
})
572572
return
573573
}
574-
defer conn.Close(websocket.StatusAbnormalClosure, "")
574+
defer conn.Close(websocket.StatusGoingAway, "")
575575

576576
var lastReport codersdk.AgentStatsReportResponse
577577
latestStat, err := api.Database.GetLatestAgentStat(ctx, workspaceAgent.ID)

coderd/workspaceagents_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestWorkspaceAgentListen(t *testing.T) {
128128
require.Eventually(t, func() bool {
129129
_, err := conn.Ping()
130130
return err == nil
131-
}, testutil.WaitMedium, testutil.IntervalFast)
131+
}, testutil.WaitLong, testutil.IntervalFast)
132132
})
133133

134134
t.Run("FailNonLatestBuild", func(t *testing.T) {

coderd/workspacebuilds_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,9 @@ func TestWorkspaceBuildLogs(t *testing.T) {
442442
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
443443
defer cancel()
444444

445-
logs, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before.Add(-time.Hour))
445+
logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before.Add(-time.Hour))
446446
require.NoError(t, err)
447+
defer closer.Close()
447448
for {
448449
log, ok := <-logs
449450
if !ok {

codersdk/provisionerdaemons.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"encoding/json"
77
"fmt"
8+
"io"
89
"net/http"
910
"net/http/cookiejar"
1011
"net/url"
@@ -104,18 +105,18 @@ func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, befo
104105
}
105106

106107
// provisionerJobLogsAfter streams logs that occurred after a specific time.
107-
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after time.Time) (<-chan ProvisionerJobLog, error) {
108+
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
108109
afterQuery := ""
109110
if !after.IsZero() {
110111
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
111112
}
112113
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
113114
if err != nil {
114-
return nil, err
115+
return nil, nil, err
115116
}
116117
jar, err := cookiejar.New(nil)
117118
if err != nil {
118-
return nil, xerrors.Errorf("create cookie jar: %w", err)
119+
return nil, nil, xerrors.Errorf("create cookie jar: %w", err)
119120
}
120121
jar.SetCookies(followURL, []*http.Cookie{{
121122
Name: SessionTokenKey,
@@ -129,11 +130,13 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
129130
CompressionMode: websocket.CompressionDisabled,
130131
})
131132
if err != nil {
132-
return nil, readBodyAsError(res)
133+
return nil, nil, readBodyAsError(res)
133134
}
134135
logs := make(chan ProvisionerJobLog)
135136
decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
137+
closed := make(chan struct{})
136138
go func() {
139+
defer close(closed)
137140
defer close(logs)
138141
defer conn.Close(websocket.StatusGoingAway, "")
139142
var log ProvisionerJobLog
@@ -149,5 +152,9 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
149152
}
150153
}
151154
}()
152-
return logs, nil
155+
return logs, closeFunc(func() error {
156+
_ = conn.Close(websocket.StatusNormalClosure, "")
157+
<-closed
158+
return nil
159+
}), nil
153160
}

codersdk/templateversions.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"io"
78
"net/http"
89
"time"
910

@@ -99,7 +100,7 @@ func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUI
99100
}
100101

101102
// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific time.
102-
func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, error) {
103+
func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
103104
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), after)
104105
}
105106

@@ -166,7 +167,7 @@ func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, j
166167

167168
// TemplateVersionDryRunLogsAfter streams logs for a template version dry-run
168169
// that occurred after a specific time.
169-
func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, error) {
170+
func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
170171
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), after)
171172
}
172173

codersdk/workspaceagents.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -308,15 +308,15 @@ func (c *Client) DialWorkspaceAgentTailnet(ctx context.Context, logger slog.Logg
308308
logger.Debug(ctx, "serving coordinator")
309309
err = <-errChan
310310
if errors.Is(err, context.Canceled) {
311-
_ = ws.Close(websocket.StatusAbnormalClosure, "")
311+
_ = ws.Close(websocket.StatusGoingAway, "")
312312
return
313313
}
314314
if err != nil {
315315
logger.Debug(ctx, "error serving coordinator", slog.Error(err))
316-
_ = ws.Close(websocket.StatusAbnormalClosure, "")
316+
_ = ws.Close(websocket.StatusGoingAway, "")
317317
continue
318318
}
319-
_ = ws.Close(websocket.StatusAbnormalClosure, "")
319+
_ = ws.Close(websocket.StatusGoingAway, "")
320320
}
321321
}()
322322
err = <-first
@@ -446,7 +446,7 @@ func (c *Client) AgentReportStats(
446446
var req AgentStatsReportRequest
447447
err := wsjson.Read(ctx, conn, &req)
448448
if err != nil {
449-
_ = conn.Close(websocket.StatusAbnormalClosure, "")
449+
_ = conn.Close(websocket.StatusGoingAway, "")
450450
return err
451451
}
452452

@@ -460,7 +460,7 @@ func (c *Client) AgentReportStats(
460460

461461
err = wsjson.Write(ctx, conn, resp)
462462
if err != nil {
463-
_ = conn.Close(websocket.StatusAbnormalClosure, "")
463+
_ = conn.Close(websocket.StatusGoingAway, "")
464464
return err
465465
}
466466
}

codersdk/workspacebuilds.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID,
102102
}
103103

104104
// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific time.
105-
func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, error) {
105+
func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) {
106106
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), after)
107107
}
108108

0 commit comments

Comments
 (0)