Skip to content

Commit b508b93

Browse files
authored
Merge pull request pkg#363 from pkg/patch/RequestServer-Serve-bugs
RequestServer.Serve bugs found looking at PR-361
2 parents f129610 + eace420 commit b508b93

File tree

2 files changed

+73
-43
lines changed

2 files changed

+73
-43
lines changed

request-server.go

+36-38
Original file line numberDiff line numberDiff line change
@@ -106,37 +106,21 @@ func (rs *RequestServer) closeRequest(handle string) error {
106106
// Close the read/write/closer to trigger exiting the main server loop
107107
func (rs *RequestServer) Close() error { return rs.conn.Close() }
108108

109-
// Serve requests for user session
110-
func (rs *RequestServer) Serve() error {
111-
defer func() {
112-
if rs.pktMgr.alloc != nil {
113-
rs.pktMgr.alloc.Free()
114-
}
115-
}()
116-
ctx, cancel := context.WithCancel(context.Background())
117-
defer cancel()
118-
var wg sync.WaitGroup
119-
runWorker := func(ch chan orderedRequest) {
120-
wg.Add(1)
121-
go func() {
122-
defer wg.Done()
123-
if err := rs.packetWorker(ctx, ch); err != nil {
124-
rs.conn.Close() // shuts down recvPacket
125-
}
126-
}()
127-
}
128-
pktChan := rs.pktMgr.workerChan(runWorker)
109+
func (rs *RequestServer) serveLoop(pktChan chan<- orderedRequest) error {
110+
defer close(pktChan) // shuts down sftpServerWorkers
129111

130112
var err error
131113
var pkt requestPacket
132114
var pktType uint8
133115
var pktBytes []byte
116+
134117
for {
135118
pktType, pktBytes, err = rs.serverConn.recvPacket(rs.pktMgr.getNextOrderID())
136119
if err != nil {
137120
// we don't care about releasing allocated pages here, the server will quit and the allocator freed
138-
break
121+
return err
139122
}
123+
140124
pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
141125
if err != nil {
142126
switch errors.Cause(err) {
@@ -145,33 +129,47 @@ func (rs *RequestServer) Serve() error {
145129
default:
146130
debug("makePacket err: %v", err)
147131
rs.conn.Close() // shuts down recvPacket
148-
break
132+
return err
149133
}
150134
}
151135

152136
pktChan <- rs.pktMgr.newOrderedRequest(pkt)
153137
}
138+
}
154139

155-
close(pktChan) // shuts down sftpServerWorkers
156-
wg.Wait() // wait for all workers to exit
140+
// Serve requests for user session
141+
func (rs *RequestServer) Serve() error {
142+
defer func() {
143+
if rs.pktMgr.alloc != nil {
144+
rs.pktMgr.alloc.Free()
145+
}
146+
}()
147+
ctx, cancel := context.WithCancel(context.Background())
148+
defer cancel()
149+
var wg sync.WaitGroup
150+
runWorker := func(ch chan orderedRequest) {
151+
wg.Add(1)
152+
go func() {
153+
defer wg.Done()
154+
if err := rs.packetWorker(ctx, ch); err != nil {
155+
rs.conn.Close() // shuts down recvPacket
156+
}
157+
}()
158+
}
159+
pktChan := rs.pktMgr.workerChan(runWorker)
160+
161+
err := rs.serveLoop(pktChan)
162+
163+
wg.Wait() // wait for all workers to exit
164+
165+
rs.openRequestLock.Lock()
166+
defer rs.openRequestLock.Unlock()
157167

158168
// make sure all open requests are properly closed
159169
// (eg. possible on dropped connections, client crashes, etc.)
160170
for handle, req := range rs.openRequests {
161-
if err != nil {
162-
req.state.RLock()
163-
writer := req.state.writerAt
164-
reader := req.state.readerAt
165-
req.state.RUnlock()
166-
if t, ok := writer.(TransferError); ok {
167-
debug("notify error: %v to writer: %v\n", err, writer)
168-
t.TransferError(err)
169-
}
170-
if t, ok := reader.(TransferError); ok {
171-
debug("notify error: %v to reader: %v\n", err, reader)
172-
t.TransferError(err)
173-
}
174-
}
171+
req.transferError(err)
172+
175173
delete(rs.openRequests, handle)
176174
req.close()
177175
}

request.go

+37-5
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,51 @@ func (r *Request) close() error {
138138
r.cancelCtx()
139139
}
140140
}()
141+
141142
r.state.RLock()
143+
wr := r.state.writerAt
142144
rd := r.state.readerAt
143145
r.state.RUnlock()
146+
147+
var err error
148+
149+
// Close errors on a Writer are far more likely to be the important one.
150+
// As they can be information that there was a loss of data.
151+
if c, ok := wr.(io.Closer); ok {
152+
if err2 := c.Close(); err == nil {
153+
// update error if it is still nil
154+
err = err2
155+
}
156+
}
157+
144158
if c, ok := rd.(io.Closer); ok {
145-
return c.Close()
159+
if err2 := c.Close(); err == nil {
160+
// update error if it is still nil
161+
err = err2
162+
}
163+
}
164+
165+
return err
166+
}
167+
168+
// Close reader/writer if possible
169+
func (r *Request) transferError(err error) {
170+
if err == nil {
171+
return
146172
}
173+
147174
r.state.RLock()
148-
wt := r.state.writerAt
175+
wr := r.state.writerAt
176+
rd := r.state.readerAt
149177
r.state.RUnlock()
150-
if c, ok := wt.(io.Closer); ok {
151-
return c.Close()
178+
179+
if t, ok := wr.(TransferError); ok {
180+
t.TransferError(err)
181+
}
182+
183+
if t, ok := rd.(TransferError); ok {
184+
t.TransferError(err)
152185
}
153-
return nil
154186
}
155187

156188
// called from worker to handle packet/request

0 commit comments

Comments
 (0)