Skip to content

Commit f9330fc

Browse files
committed
Allow to configure maxConcurrentRequests
1 parent 1073df2 commit f9330fc

File tree

1 file changed

+26
-13
lines changed

1 file changed

+26
-13
lines changed

client.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,19 @@ func MaxPacket(size int) ClientOption {
7676
return MaxPacketChecked(size)
7777
}
7878

79+
// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
80+
//
81+
// The default maximum concurrent requests is 64.
82+
func MaxConcurrentRequestsPerFile(n int) ClientOption {
83+
return func(c *Client) error {
84+
if n < 1 {
85+
return errors.Errorf("n must be greater or equal to 1")
86+
}
87+
c.maxConcurrentRequests = n
88+
return nil
89+
}
90+
}
91+
7992
// NewClient creates a new SFTP client on conn, using zero or more option
8093
// functions.
8194
func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
@@ -110,7 +123,8 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
110123
},
111124
inflight: make(map[uint32]chan<- result),
112125
},
113-
maxPacket: 1 << 15,
126+
maxPacket: 1 << 15,
127+
maxConcurrentRequests: 64,
114128
}
115129
if err := sftp.applyOptions(opts...); err != nil {
116130
wr.Close()
@@ -137,8 +151,9 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
137151
type Client struct {
138152
clientConn
139153

140-
maxPacket int // max packet size read or written.
141-
nextid uint32
154+
maxPacket int // max packet size read or written.
155+
nextid uint32
156+
maxConcurrentRequests int
142157
}
143158

144159
// Create creates the named file mode 0666 (before umask), truncating it if it
@@ -759,8 +774,6 @@ func (f *File) Name() string {
759774
return f.path
760775
}
761776

762-
const maxConcurrentRequests = 64
763-
764777
// Read reads up to len(b) bytes from the File. It returns the number of bytes
765778
// read and an error, if any. Read follows io.Reader semantics, so when Read
766779
// encounters an error or EOF condition after successfully reading n > 0 bytes,
@@ -780,7 +793,7 @@ func (f *File) Read(b []byte) (int, error) {
780793
offset := f.offset
781794
// maxConcurrentRequests buffer to deal with broadcastErr() floods
782795
// also must have a buffer of max value of (desiredInFlight - inFlight)
783-
ch := make(chan result, maxConcurrentRequests+1)
796+
ch := make(chan result, f.c.maxConcurrentRequests+1)
784797
type inflightRead struct {
785798
b []byte
786799
offset uint64
@@ -845,7 +858,7 @@ func (f *File) Read(b []byte) (int, error) {
845858
if n < len(req.b) {
846859
sendReq(req.b[l:], req.offset+uint64(l))
847860
}
848-
if desiredInFlight < maxConcurrentRequests {
861+
if desiredInFlight < f.c.maxConcurrentRequests {
849862
desiredInFlight++
850863
}
851864
default:
@@ -880,7 +893,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
880893
writeOffset := offset
881894
fileSize := uint64(fi.Size())
882895
// see comment on same line in Read() above
883-
ch := make(chan result, maxConcurrentRequests+1)
896+
ch := make(chan result, f.c.maxConcurrentRequests+1)
884897
type inflightRead struct {
885898
b []byte
886899
offset uint64
@@ -960,7 +973,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
960973
switch {
961974
case offset > fileSize:
962975
desiredInFlight = 1
963-
case desiredInFlight < maxConcurrentRequests:
976+
case desiredInFlight < f.c.maxConcurrentRequests:
964977
desiredInFlight++
965978
}
966979
writeOffset += uint64(nbytes)
@@ -1028,7 +1041,7 @@ func (f *File) Write(b []byte) (int, error) {
10281041
desiredInFlight := 1
10291042
offset := f.offset
10301043
// see comment on same line in Read() above
1031-
ch := make(chan result, maxConcurrentRequests+1)
1044+
ch := make(chan result, f.c.maxConcurrentRequests+1)
10321045
var firstErr error
10331046
written := len(b)
10341047
for len(b) > 0 || inFlight > 0 {
@@ -1064,7 +1077,7 @@ func (f *File) Write(b []byte) (int, error) {
10641077
firstErr = err
10651078
break
10661079
}
1067-
if desiredInFlight < maxConcurrentRequests {
1080+
if desiredInFlight < f.c.maxConcurrentRequests {
10681081
desiredInFlight++
10691082
}
10701083
default:
@@ -1093,7 +1106,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
10931106
desiredInFlight := 1
10941107
offset := f.offset
10951108
// see comment on same line in Read() above
1096-
ch := make(chan result, maxConcurrentRequests+1)
1109+
ch := make(chan result, f.c.maxConcurrentRequests+1)
10971110
var firstErr error
10981111
read := int64(0)
10991112
b := make([]byte, f.c.maxPacket)
@@ -1132,7 +1145,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
11321145
firstErr = err
11331146
break
11341147
}
1135-
if desiredInFlight < maxConcurrentRequests {
1148+
if desiredInFlight < f.c.maxConcurrentRequests {
11361149
desiredInFlight++
11371150
}
11381151
default:

0 commit comments

Comments
 (0)