@@ -106,37 +106,21 @@ func (rs *RequestServer) closeRequest(handle string) error {
106
106
// Close the read/write/closer to trigger exiting the main server loop
107
107
func (rs * RequestServer ) Close () error { return rs .conn .Close () }
108
108
109
- // Serve requests for user session
110
- func (rs * RequestServer ) Serve () error {
111
- defer func () {
112
- if rs .pktMgr .alloc != nil {
113
- rs .pktMgr .alloc .Free ()
114
- }
115
- }()
116
- ctx , cancel := context .WithCancel (context .Background ())
117
- defer cancel ()
118
- var wg sync.WaitGroup
119
- runWorker := func (ch chan orderedRequest ) {
120
- wg .Add (1 )
121
- go func () {
122
- defer wg .Done ()
123
- if err := rs .packetWorker (ctx , ch ); err != nil {
124
- rs .conn .Close () // shuts down recvPacket
125
- }
126
- }()
127
- }
128
- pktChan := rs .pktMgr .workerChan (runWorker )
109
+ func (rs * RequestServer ) serveLoop (pktChan chan <- orderedRequest ) error {
110
+ defer close (pktChan ) // shuts down sftpServerWorkers
129
111
130
112
var err error
131
113
var pkt requestPacket
132
114
var pktType uint8
133
115
var pktBytes []byte
116
+
134
117
for {
135
118
pktType , pktBytes , err = rs .serverConn .recvPacket (rs .pktMgr .getNextOrderID ())
136
119
if err != nil {
137
120
// we don't care about releasing allocated pages here, the server will quit and the allocator freed
138
- break
121
+ return err
139
122
}
123
+
140
124
pkt , err = makePacket (rxPacket {fxp (pktType ), pktBytes })
141
125
if err != nil {
142
126
switch errors .Cause (err ) {
@@ -145,33 +129,47 @@ func (rs *RequestServer) Serve() error {
145
129
default :
146
130
debug ("makePacket err: %v" , err )
147
131
rs .conn .Close () // shuts down recvPacket
148
- break
132
+ return err
149
133
}
150
134
}
151
135
152
136
pktChan <- rs .pktMgr .newOrderedRequest (pkt )
153
137
}
138
+ }
154
139
155
- close (pktChan ) // shuts down sftpServerWorkers
156
- wg .Wait () // wait for all workers to exit
140
+ // Serve requests for user session
141
+ func (rs * RequestServer ) Serve () error {
142
+ defer func () {
143
+ if rs .pktMgr .alloc != nil {
144
+ rs .pktMgr .alloc .Free ()
145
+ }
146
+ }()
147
+ ctx , cancel := context .WithCancel (context .Background ())
148
+ defer cancel ()
149
+ var wg sync.WaitGroup
150
+ runWorker := func (ch chan orderedRequest ) {
151
+ wg .Add (1 )
152
+ go func () {
153
+ defer wg .Done ()
154
+ if err := rs .packetWorker (ctx , ch ); err != nil {
155
+ rs .conn .Close () // shuts down recvPacket
156
+ }
157
+ }()
158
+ }
159
+ pktChan := rs .pktMgr .workerChan (runWorker )
160
+
161
+ err := rs .serveLoop (pktChan )
162
+
163
+ wg .Wait () // wait for all workers to exit
164
+
165
+ rs .openRequestLock .Lock ()
166
+ defer rs .openRequestLock .Unlock ()
157
167
158
168
// make sure all open requests are properly closed
159
169
// (eg. possible on dropped connections, client crashes, etc.)
160
170
for handle , req := range rs .openRequests {
161
- if err != nil {
162
- req .state .RLock ()
163
- writer := req .state .writerAt
164
- reader := req .state .readerAt
165
- req .state .RUnlock ()
166
- if t , ok := writer .(TransferError ); ok {
167
- debug ("notify error: %v to writer: %v\n " , err , writer )
168
- t .TransferError (err )
169
- }
170
- if t , ok := reader .(TransferError ); ok {
171
- debug ("notify error: %v to reader: %v\n " , err , reader )
172
- t .TransferError (err )
173
- }
174
- }
171
+ req .transferError (err )
172
+
175
173
delete (rs .openRequests , handle )
176
174
req .close ()
177
175
}
0 commit comments