@@ -76,6 +76,19 @@ func MaxPacket(size int) ClientOption {
76
76
return MaxPacketChecked (size )
77
77
}
78
78
79
+ // MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
80
+ //
81
+ // The default maximum concurrent requests is 64.
82
+ func MaxConcurrentRequestsPerFile (n int ) ClientOption {
83
+ return func (c * Client ) error {
84
+ if n < 1 {
85
+ return errors .Errorf ("n must be greater or equal to 1" )
86
+ }
87
+ c .maxConcurrentRequests = n
88
+ return nil
89
+ }
90
+ }
91
+
79
92
// NewClient creates a new SFTP client on conn, using zero or more option
80
93
// functions.
81
94
func NewClient (conn * ssh.Client , opts ... ClientOption ) (* Client , error ) {
@@ -110,7 +123,8 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
110
123
},
111
124
inflight : make (map [uint32 ]chan <- result ),
112
125
},
113
- maxPacket : 1 << 15 ,
126
+ maxPacket : 1 << 15 ,
127
+ maxConcurrentRequests : 64 ,
114
128
}
115
129
if err := sftp .applyOptions (opts ... ); err != nil {
116
130
wr .Close ()
@@ -137,8 +151,9 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
137
151
type Client struct {
138
152
clientConn
139
153
140
- maxPacket int // max packet size read or written.
141
- nextid uint32
154
+ maxPacket int // max packet size read or written.
155
+ nextid uint32
156
+ maxConcurrentRequests int
142
157
}
143
158
144
159
// Create creates the named file mode 0666 (before umask), truncating it if it
@@ -759,8 +774,6 @@ func (f *File) Name() string {
759
774
return f .path
760
775
}
761
776
762
- const maxConcurrentRequests = 64
763
-
764
777
// Read reads up to len(b) bytes from the File. It returns the number of bytes
765
778
// read and an error, if any. Read follows io.Reader semantics, so when Read
766
779
// encounters an error or EOF condition after successfully reading n > 0 bytes,
@@ -780,7 +793,7 @@ func (f *File) Read(b []byte) (int, error) {
780
793
offset := f .offset
781
794
// maxConcurrentRequests buffer to deal with broadcastErr() floods
782
795
// also must have a buffer of max value of (desiredInFlight - inFlight)
783
- ch := make (chan result , maxConcurrentRequests + 1 )
796
+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
784
797
type inflightRead struct {
785
798
b []byte
786
799
offset uint64
@@ -845,7 +858,7 @@ func (f *File) Read(b []byte) (int, error) {
845
858
if n < len (req .b ) {
846
859
sendReq (req .b [l :], req .offset + uint64 (l ))
847
860
}
848
- if desiredInFlight < maxConcurrentRequests {
861
+ if desiredInFlight < f . c . maxConcurrentRequests {
849
862
desiredInFlight ++
850
863
}
851
864
default :
@@ -880,7 +893,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
880
893
writeOffset := offset
881
894
fileSize := uint64 (fi .Size ())
882
895
// see comment on same line in Read() above
883
- ch := make (chan result , maxConcurrentRequests + 1 )
896
+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
884
897
type inflightRead struct {
885
898
b []byte
886
899
offset uint64
@@ -960,7 +973,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
960
973
switch {
961
974
case offset > fileSize :
962
975
desiredInFlight = 1
963
- case desiredInFlight < maxConcurrentRequests :
976
+ case desiredInFlight < f . c . maxConcurrentRequests :
964
977
desiredInFlight ++
965
978
}
966
979
writeOffset += uint64 (nbytes )
@@ -1028,7 +1041,7 @@ func (f *File) Write(b []byte) (int, error) {
1028
1041
desiredInFlight := 1
1029
1042
offset := f .offset
1030
1043
// see comment on same line in Read() above
1031
- ch := make (chan result , maxConcurrentRequests + 1 )
1044
+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
1032
1045
var firstErr error
1033
1046
written := len (b )
1034
1047
for len (b ) > 0 || inFlight > 0 {
@@ -1064,7 +1077,7 @@ func (f *File) Write(b []byte) (int, error) {
1064
1077
firstErr = err
1065
1078
break
1066
1079
}
1067
- if desiredInFlight < maxConcurrentRequests {
1080
+ if desiredInFlight < f . c . maxConcurrentRequests {
1068
1081
desiredInFlight ++
1069
1082
}
1070
1083
default :
@@ -1093,7 +1106,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
1093
1106
desiredInFlight := 1
1094
1107
offset := f .offset
1095
1108
// see comment on same line in Read() above
1096
- ch := make (chan result , maxConcurrentRequests + 1 )
1109
+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
1097
1110
var firstErr error
1098
1111
read := int64 (0 )
1099
1112
b := make ([]byte , f .c .maxPacket )
@@ -1132,7 +1145,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
1132
1145
firstErr = err
1133
1146
break
1134
1147
}
1135
- if desiredInFlight < maxConcurrentRequests {
1148
+ if desiredInFlight < f . c . maxConcurrentRequests {
1136
1149
desiredInFlight ++
1137
1150
}
1138
1151
default :
0 commit comments