@@ -105,29 +105,59 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
105
105
}
106
106
process = rprocess .process
107
107
} else {
108
- process , err = execer .Start (context .Background (), command )
108
+ // The process will be kept alive as long as this context does not
109
+ // finish (and as long as the process does not exit on its own). This
110
+ // is a new context since the parent context finishes when the request
111
+ // ends which would kill the process prematurely.
112
+ ctx , cancel := context .WithCancel (context .Background ())
113
+
114
+ // The process will be killed if the provided context ends.
115
+ process , err = execer .Start (ctx , command )
109
116
if err != nil {
117
+ cancel ()
110
118
return err
111
119
}
112
120
113
- ringBuffer , err := circbuf .NewBuffer (1 << 20 )
121
+ // Default to buffer 64KB.
122
+ ringBuffer , err := circbuf .NewBuffer (64 * 1024 )
114
123
if err != nil {
124
+ cancel ()
115
125
return xerrors .Errorf ("unable to create ring buffer %w" , err )
116
126
}
117
127
118
128
rprocess = & reconnectingProcess {
119
129
activeConns : make (map [string ]net.Conn ),
120
130
process : process ,
121
- // Default to buffer 1MB.
131
+ // Timeouts created with AfterFunc can be reset.
132
+ timeout : time .AfterFunc (options .ReconnectingProcessTimeout , cancel ),
122
133
ringBuffer : ringBuffer ,
123
134
}
124
135
reconnectingProcesses .Store (header .ID , rprocess )
136
+
137
+ // If the process exits send the exit code to all listening
138
+ // connections then close everything.
139
+ go func () {
140
+ err = process .Wait ()
141
+ code := 0
142
+ if exitErr , ok := err .(ExitError ); ok {
143
+ code = exitErr .Code
144
+ }
145
+ rprocess .activeConnsMutex .Lock ()
146
+ for _ , conn := range rprocess .activeConns {
147
+ _ = sendExitCode (ctx , code , conn )
148
+ }
149
+ rprocess .activeConnsMutex .Unlock ()
150
+ rprocess .Close ()
151
+ reconnectingProcesses .Delete (header .ID )
152
+ }()
153
+
154
+ // Write to the ring buffer and all connections as we receive stdout.
125
155
go func () {
126
156
buffer := make ([]byte , 32 * 1024 )
127
157
for {
128
158
read , err := rprocess .process .Stdout ().Read (buffer )
129
159
if err != nil {
130
- flog . Error ( "reconnecting process %s read: %v" , header . ID , err )
160
+ // When the process is closed this is triggered.
131
161
break
132
162
}
133
163
part := buffer [:read ]
@@ -142,9 +172,6 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
142
172
}
143
173
rprocess .activeConnsMutex .Unlock ()
144
174
}
145
- // If we break from the loop, the reconnecting PTY ended or errored.
146
- rprocess .Close ()
147
- reconnectingProcesses .Delete (header .ID )
148
175
}()
149
176
}
150
177
@@ -153,39 +180,38 @@ func Serve(ctx context.Context, c *websocket.Conn, execer Execer, options *Optio
153
180
flog .Error ("failed to send pid %d" , process .Pid ())
154
181
}
155
182
156
- // Write the initial contents out .
183
+ // Write out the initial contents in the ring buffer .
157
184
err = sendOutput (ctx , rprocess .ringBuffer .Bytes (), wsNetConn )
158
185
if err != nil {
159
186
return xerrors .Errorf ("write reconnecting process %s buffer: %w" , header .ID , err )
160
187
}
161
188
189
+ // Store this connection on the reconnecting process. All connections
190
+ // stored on the process will receive the process's stdout.
162
191
connectionID := uuid .NewString ()
163
192
rprocess .activeConnsMutex .Lock ()
164
193
rprocess .activeConns [connectionID ] = wsNetConn
165
- if rprocess .timeoutCancel != nil {
166
- rprocess .timeoutCancel ()
167
- rprocess .timeoutCancel = nil
168
- }
169
194
rprocess .activeConnsMutex .Unlock ()
195
+
196
+ // Keep resetting the inactivity timer while this connection is alive.
197
+ rprocess .timeout .Reset (options .ReconnectingProcessTimeout )
198
+ heartbeat := time .NewTimer (options .ReconnectingProcessTimeout / 2 )
199
+ defer heartbeat .Stop ()
200
+ go func () {
201
+ for {
202
+ select {
203
+ case <- heartbeat .C :
204
+ }
205
+ rprocess .timeout .Reset (options .ReconnectingProcessTimeout )
206
+ }
207
+ }()
208
+
209
+ // Remove this connection from the process's connection list once the
210
+ // connection ends so data is no longer sent to it.
170
211
defer func () {
171
- wsNetConn .Close ()
212
+ wsNetConn .Close () // REVIEW@asher: Not sure if necessary.
172
213
rprocess .activeConnsMutex .Lock ()
173
214
delete (rprocess .activeConns , connectionID )
174
- if len (rprocess .activeConns ) == 0 {
175
- timeout := time .NewTimer (options .ReconnectingProcessTimeout )
176
- timeoutCtx , cancel := context .WithCancel (context .Background ())
177
- rprocess .timeoutCancel = cancel
178
- go func () {
179
- defer cancel ()
180
- // Close if the inactive timeout occurs.
181
- select {
182
- case <- timeout .C :
183
- flog .Info ("killing reconnecting process %s due to inactivity" , header .ID )
184
- rprocess .Close ()
185
- case <- timeoutCtx .Done ():
186
- }
187
- }()
188
- }
189
215
rprocess .activeConnsMutex .Unlock ()
190
216
}()
191
217
} else {
@@ -300,11 +326,13 @@ type reconnectingProcess struct {
300
326
activeConnsMutex sync.Mutex
301
327
activeConns map [string ]net.Conn
302
328
303
- ringBuffer * circbuf.Buffer
304
- timeoutCancel context. CancelFunc
305
- process Process
329
+ ringBuffer * circbuf.Buffer
330
+ timeout * time. Timer
331
+ process Process
306
332
}
307
333
334
+ // Close ends all connections to the reconnecting process and clears the ring
335
+ // buffer.
308
336
func (r * reconnectingProcess ) Close () {
309
337
r .activeConnsMutex .Lock ()
310
338
defer r .activeConnsMutex .Unlock ()
0 commit comments