Skip to content

Commit a862b70

Browse files
committed
Refactor inactivity timeout to match v2 impl
1 parent 214d0a0 commit a862b70

File tree

2 files changed

+62
-36
lines changed

2 files changed

+62
-36
lines changed

server.go

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -105,29 +105,59 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
105105
}
106106
process = rprocess.process
107107
} else {
108-
process, err = execer.Start(context.Background(), command)
108+
// The process will be kept alive as long as this context does not
109+
// finish (and as long as the process does not exit on its own). This
110+
// is a new context since the parent context finishes when the request
111+
// ends which would kill the process prematurely.
112+
ctx, cancel := context.WithCancel(context.Background())
113+
114+
// The process will be killed if the provided context ends.
115+
process, err = execer.Start(ctx, command)
109116
if err != nil {
117+
cancel()
110118
return err
111119
}
112120

113-
ringBuffer, err := circbuf.NewBuffer(1 << 20)
121+
// Default to buffer 64KB.
122+
ringBuffer, err := circbuf.NewBuffer(64 * 1024)
114123
if err != nil {
124+
cancel()
115125
return xerrors.Errorf("unable to create ring buffer %w", err)
116126
}
117127

118128
rprocess = &reconnectingProcess{
119129
activeConns: make(map[string]net.Conn),
120130
process: process,
121-
// Default to buffer 1MB.
131+
// Timeouts created with AfterFunc can be reset.
132+
timeout: time.AfterFunc(options.ReconnectingProcessTimeout, cancel),
122133
ringBuffer: ringBuffer,
123134
}
124135
reconnectingProcesses.Store(header.ID, rprocess)
136+
137+
// If the process exits send the exit code to all listening
138+
// connections then close everything.
139+
go func() {
140+
err = process.Wait()
141+
code := 0
142+
if exitErr, ok := err.(ExitError); ok {
143+
code = exitErr.Code
144+
}
145+
rprocess.activeConnsMutex.Lock()
146+
for _, conn := range rprocess.activeConns {
147+
_ = sendExitCode(ctx, code, conn)
148+
}
149+
rprocess.activeConnsMutex.Unlock()
150+
rprocess.Close()
151+
reconnectingProcesses.Delete(header.ID)
152+
}()
153+
154+
// Write to the ring buffer and all connections as we receive stdout.
125155
go func() {
126156
buffer := make([]byte, 32*1024)
127157
for {
128158
read, err := rprocess.process.Stdout().Read(buffer)
129159
if err != nil {
130-
flog.Error("reconnecting process %s read: %v", header.ID, err)
160+
// When the process is closed this is triggered.
131161
break
132162
}
133163
part := buffer[:read]
@@ -142,9 +172,6 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
142172
}
143173
rprocess.activeConnsMutex.Unlock()
144174
}
145-
// If we break from the loop, the reconnecting PTY ended or errored.
146-
rprocess.Close()
147-
reconnectingProcesses.Delete(header.ID)
148175
}()
149176
}
150177

@@ -153,39 +180,38 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
153180
flog.Error("failed to send pid %d", process.Pid())
154181
}
155182

156-
// Write the initial contents out.
183+
// Write out the initial contents in the ring buffer.
157184
err = sendOutput(ctx, rprocess.ringBuffer.Bytes(), wsNetConn)
158185
if err != nil {
159186
return xerrors.Errorf("write reconnecting process %s buffer: %w", header.ID, err)
160187
}
161188

189+
// Store this connection on the reconnecting process. All connections
190+
// stored on the process will receive the process's stdout.
162191
connectionID := uuid.NewString()
163192
rprocess.activeConnsMutex.Lock()
164193
rprocess.activeConns[connectionID] = wsNetConn
165-
if rprocess.timeoutCancel != nil {
166-
rprocess.timeoutCancel()
167-
rprocess.timeoutCancel = nil
168-
}
169194
rprocess.activeConnsMutex.Unlock()
195+
196+
// Keep resetting the inactivity timer while this connection is alive.
197+
rprocess.timeout.Reset(options.ReconnectingProcessTimeout)
198+
heartbeat := time.NewTimer(options.ReconnectingProcessTimeout / 2)
199+
defer heartbeat.Stop()
200+
go func() {
201+
for {
202+
select {
203+
case <-heartbeat.C:
204+
}
205+
rprocess.timeout.Reset(options.ReconnectingProcessTimeout)
206+
}
207+
}()
208+
209+
// Remove this connection from the process's connection list once the
210+
// connection ends so data is no longer sent to it.
170211
defer func() {
171-
wsNetConn.Close()
212+
wsNetConn.Close() // REVIEW@asher: Not sure if necessary.
172213
rprocess.activeConnsMutex.Lock()
173214
delete(rprocess.activeConns, connectionID)
174-
if len(rprocess.activeConns) == 0 {
175-
timeout := time.NewTimer(options.ReconnectingProcessTimeout)
176-
timeoutCtx, cancel := context.WithCancel(context.Background())
177-
rprocess.timeoutCancel = cancel
178-
go func() {
179-
defer cancel()
180-
// Close if the inactive timeout occurs.
181-
select {
182-
case <-timeout.C:
183-
flog.Info("killing reconnecting process %s due to inactivity", header.ID)
184-
rprocess.Close()
185-
case <-timeoutCtx.Done():
186-
}
187-
}()
188-
}
189215
rprocess.activeConnsMutex.Unlock()
190216
}()
191217
} else {
@@ -300,11 +326,13 @@ type reconnectingProcess struct {
300326
activeConnsMutex sync.Mutex
301327
activeConns map[string]net.Conn
302328

303-
ringBuffer *circbuf.Buffer
304-
timeoutCancel context.CancelFunc
305-
process Process
329+
ringBuffer *circbuf.Buffer
330+
timeout *time.Timer
331+
process Process
306332
}
307333

334+
// Close ends all connections to the reconnecting process and clears the ring
335+
// buffer.
308336
func (r *reconnectingProcess) Close() {
309337
r.activeConnsMutex.Lock()
310338
defer r.activeConnsMutex.Unlock()

tty_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ func TestReconnectTTY(t *testing.T) {
109109

110110
assert.True(t, "find echo", findEcho(expected))
111111

112-
// Test disconnecting (which starts inactivity) then reconnecting (which
113-
// cancels it).
112+
// Test disconnecting then reconnecting.
114113
ws.Close(websocket.StatusNormalClosure, "disconnected")
115114
server.Close()
116115

@@ -134,10 +133,9 @@ func TestReconnectTTY(t *testing.T) {
134133

135134
assert.True(t, "find echo", findEcho(expected))
136135

137-
// Test disconnecting while another connection is active (which should skip
138-
// starting inactivity entirely).
136+
// Test disconnecting while another connection is active.
139137
ws2, server2 := mockConn(ctx, t, &Options{
140-
ReconnectingProcessTimeout: time.Millisecond,
138+
ReconnectingProcessTimeout: time.Second,
141139
})
142140
defer server2.Close()
143141

0 commit comments

Comments
 (0)