5
5
"io"
6
6
"sync"
7
7
8
+ "golang.org/x/sync/errgroup"
8
9
"golang.org/x/sync/singleflight"
9
10
"golang.org/x/xerrors"
10
11
)
@@ -23,8 +24,8 @@ const (
23
24
DefaultBufferSize = 64 * 1024 * 1024
24
25
)
25
26
26
- // ReconnectFunc is called when the BackedPipe needs to establish a new connection .
27
- // It should:
27
+ // Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect .
28
+ // Implementations should:
28
29
// 1. Establish a new connection to the remote side
29
30
// 2. Exchange sequence numbers with the remote side
30
31
// 3. Return the new connection and the remote's current sequence number
@@ -34,7 +35,9 @@ const (
34
35
//
35
36
// The returned readerSeqNum should be the remote side's current sequence number,
36
37
// which indicates where the local reader should resume from.
37
- type ReconnectFunc func (ctx context.Context , writerSeqNum uint64 ) (conn io.ReadWriteCloser , readerSeqNum uint64 , err error )
38
+ type Reconnector interface {
39
+ Reconnect (ctx context.Context , writerSeqNum uint64 ) (conn io.ReadWriteCloser , readerSeqNum uint64 , err error )
40
+ }
38
41
39
42
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
40
43
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
@@ -45,44 +48,39 @@ type BackedPipe struct {
45
48
mu sync.RWMutex
46
49
reader * BackedReader
47
50
writer * BackedWriter
48
- reconnectFn ReconnectFunc
51
+ reconnector Reconnector
49
52
conn io.ReadWriteCloser
50
53
connected bool
51
54
closed bool
52
55
53
56
// Reconnection state
54
57
reconnecting bool
55
58
56
- // Error channel for receiving connection errors from reader/writer
57
- errorChan chan error
59
+ // Error channels for receiving connection errors from reader/writer separately
60
+ readerErrorChan chan error
61
+ writerErrorChan chan error
58
62
59
63
// singleflight group to dedupe concurrent ForceReconnect calls
60
64
sf singleflight.Group
61
65
}
62
66
63
- // NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function .
67
+ // NewBackedPipe creates a new BackedPipe with default options and the specified reconnector .
64
68
// The pipe starts disconnected and must be connected using Connect().
65
- func NewBackedPipe (ctx context.Context , reconnectFn ReconnectFunc ) * BackedPipe {
69
+ func NewBackedPipe (ctx context.Context , reconnector Reconnector ) * BackedPipe {
66
70
pipeCtx , cancel := context .WithCancel (ctx )
67
71
68
- errorChan := make (chan error , 2 ) // Buffer for reader and writer errors
72
+ readerErrorChan := make (chan error , 1 ) // Buffer for reader errors
73
+ writerErrorChan := make (chan error , 1 ) // Buffer for writer errors
69
74
bp := & BackedPipe {
70
- ctx : pipeCtx ,
71
- cancel : cancel ,
72
- reader : NewBackedReader (),
73
- writer : NewBackedWriter (DefaultBufferSize , errorChan ),
74
- reconnectFn : reconnectFn ,
75
- errorChan : errorChan ,
75
+ ctx : pipeCtx ,
76
+ cancel : cancel ,
77
+ reader : NewBackedReader (readerErrorChan ),
78
+ writer : NewBackedWriter (DefaultBufferSize , writerErrorChan ),
79
+ reconnector : reconnector ,
80
+ readerErrorChan : readerErrorChan ,
81
+ writerErrorChan : writerErrorChan ,
76
82
}
77
83
78
- // Set up error callback for reader only (writer uses error channel directly)
79
- bp .reader .SetErrorCallback (func (err error ) {
80
- select {
81
- case bp .errorChan <- err :
82
- case <- bp .ctx .Done ():
83
- }
84
- })
85
-
86
84
// Start error handler goroutine
87
85
go bp .handleErrors ()
88
86
@@ -109,16 +107,7 @@ func (bp *BackedPipe) Connect() error {
109
107
110
108
// Read implements io.Reader by delegating to the BackedReader.
111
109
func (bp * BackedPipe ) Read (p []byte ) (int , error ) {
112
- bp .mu .RLock ()
113
- reader := bp .reader
114
- closed := bp .closed
115
- bp .mu .RUnlock ()
116
-
117
- if closed {
118
- return 0 , io .EOF
119
- }
120
-
121
- return reader .Read (p )
110
+ return bp .reader .Read (p )
122
111
}
123
112
124
113
// Write implements io.Writer by delegating to the BackedWriter.
@@ -147,32 +136,38 @@ func (bp *BackedPipe) Close() error {
147
136
bp .closed = true
148
137
bp .cancel () // Cancel main context
149
138
150
- // Close underlying components
151
- var readerErr , writerErr , connErr error
139
+ // Close all components in parallel to avoid deadlocks
140
+ //
141
+ // IMPORTANT: The connection must be closed first to unblock any
142
+ // readers or writers that might be holding the mutex on Read/Write
143
+ var g errgroup.Group
152
144
153
- if bp .reader != nil {
154
- readerErr = bp .reader .Close ()
145
+ if bp .conn != nil {
146
+ conn := bp .conn
147
+ g .Go (func () error {
148
+ return conn .Close ()
149
+ })
150
+ bp .conn = nil
155
151
}
156
152
157
- if bp .writer != nil {
158
- writerErr = bp .writer .Close ()
153
+ if bp .reader != nil {
154
+ reader := bp .reader
155
+ g .Go (func () error {
156
+ return reader .Close ()
157
+ })
159
158
}
160
159
161
- if bp .conn != nil {
162
- connErr = bp .conn .Close ()
163
- bp .conn = nil
160
+ if bp .writer != nil {
161
+ writer := bp .writer
162
+ g .Go (func () error {
163
+ return writer .Close ()
164
+ })
164
165
}
165
166
166
167
bp .connected = false
167
168
168
- // Return first error encountered
169
- if readerErr != nil {
170
- return readerErr
171
- }
172
- if writerErr != nil {
173
- return writerErr
174
- }
175
- return connErr
169
+ // Wait for all close operations to complete and return any error
170
+ return g .Wait ()
176
171
}
177
172
178
173
// Connected returns whether the pipe is currently connected.
@@ -204,7 +199,7 @@ func (bp *BackedPipe) reconnectLocked() error {
204
199
// Get current writer sequence number to send to remote
205
200
writerSeqNum := bp .writer .SequenceNum ()
206
201
207
- conn , readerSeqNum , err := bp .reconnectFn (bp .ctx , writerSeqNum )
202
+ conn , readerSeqNum , err := bp .reconnector . Reconnect (bp .ctx , writerSeqNum )
208
203
if err != nil {
209
204
return ErrReconnectFailed
210
205
}
@@ -244,32 +239,40 @@ func (bp *BackedPipe) handleErrors() {
244
239
select {
245
240
case <- bp .ctx .Done ():
246
241
return
247
- case err := <- bp .errorChan :
248
- // Connection error occurred
249
- bp .mu .Lock ()
250
-
251
- // Skip if already closed or not connected
252
- if bp .closed || ! bp .connected {
253
- bp .mu .Unlock ()
254
- continue
255
- }
256
-
257
- // Mark as disconnected
258
- bp .connected = false
259
-
260
- // Try to reconnect using internal context
261
- reconnectErr := bp .reconnectLocked ()
262
- bp .mu .Unlock ()
263
-
264
- if reconnectErr != nil {
265
- // Reconnection failed - log or handle as needed
266
- // For now, we'll just continue and wait for manual reconnection
267
- _ = err // Use the original error
268
- }
242
+ case err := <- bp .readerErrorChan :
243
+ // Reader connection error occurred
244
+ bp .handleConnectionError (err , "reader" )
245
+ case err := <- bp .writerErrorChan :
246
+ // Writer connection error occurred
247
+ bp .handleConnectionError (err , "writer" )
269
248
}
270
249
}
271
250
}
272
251
252
+ // handleConnectionError handles errors from either reader or writer components.
253
+ func (bp * BackedPipe ) handleConnectionError (err error , component string ) {
254
+ bp .mu .Lock ()
255
+ defer bp .mu .Unlock ()
256
+
257
+ // Skip if already closed or not connected
258
+ if bp .closed || ! bp .connected {
259
+ return
260
+ }
261
+
262
+ // Mark as disconnected
263
+ bp .connected = false
264
+
265
+ // Try to reconnect using internal context
266
+ reconnectErr := bp .reconnectLocked ()
267
+
268
+ if reconnectErr != nil {
269
+ // Reconnection failed - log or handle as needed
270
+ // For now, we'll just continue and wait for manual reconnection
271
+ _ = err // Use the original error from the component
272
+ _ = component // Component info available for potential logging by higher layers
273
+ }
274
+ }
275
+
273
276
// ForceReconnect forces a reconnection attempt immediately.
274
277
// This can be used to force a reconnection if a new connection is established.
275
278
func (bp * BackedPipe ) ForceReconnect () error {
0 commit comments