@@ -24,7 +24,7 @@ import (
24
24
"github.com/ethereum/go-ethereum/common/mclock"
25
25
)
26
26
27
- const fcTimeConst = 1000000
27
+ const fcTimeConst = time . Millisecond
28
28
29
29
type ServerParams struct {
30
30
BufLimit , MinRecharge uint64
@@ -33,7 +33,7 @@ type ServerParams struct {
33
33
type ClientNode struct {
34
34
params * ServerParams
35
35
bufValue uint64
36
- lastTime int64
36
+ lastTime mclock. AbsTime
37
37
lock sync.Mutex
38
38
cm * ClientManager
39
39
cmNode * cmNode
@@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode {
44
44
cm : cm ,
45
45
params : params ,
46
46
bufValue : params .BufLimit ,
47
- lastTime : getTime (),
47
+ lastTime : mclock . Now (),
48
48
}
49
49
node .cmNode = cm .addNode (node )
50
50
return node
@@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) {
54
54
cm .removeNode (peer .cmNode )
55
55
}
56
56
57
- func (peer * ClientNode ) recalcBV (time int64 ) {
57
+ func (peer * ClientNode ) recalcBV (time mclock. AbsTime ) {
58
58
dt := uint64 (time - peer .lastTime )
59
59
if time < peer .lastTime {
60
60
dt = 0
61
61
}
62
- peer .bufValue += peer .params .MinRecharge * dt / fcTimeConst
62
+ peer .bufValue += peer .params .MinRecharge * dt / uint64 ( fcTimeConst )
63
63
if peer .bufValue > peer .params .BufLimit {
64
64
peer .bufValue = peer .params .BufLimit
65
65
}
@@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) {
70
70
peer .lock .Lock ()
71
71
defer peer .lock .Unlock ()
72
72
73
- time := getTime ()
73
+ time := mclock . Now ()
74
74
peer .recalcBV (time )
75
75
return peer .bufValue , peer .cm .accept (peer .cmNode , time )
76
76
}
@@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
79
79
peer .lock .Lock ()
80
80
defer peer .lock .Unlock ()
81
81
82
- time := getTime ()
82
+ time := mclock . Now ()
83
83
peer .recalcBV (time )
84
84
peer .bufValue -= cost
85
85
peer .recalcBV (time )
@@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
94
94
}
95
95
96
96
type ServerNode struct {
97
- bufEstimate uint64
98
- lastTime int64
99
- params * ServerParams
100
- sumCost uint64 // sum of req costs sent to this server
101
- pending map [uint64 ]uint64 // value = sumCost after sending the given req
102
- lock sync.RWMutex
97
+ bufEstimate uint64
98
+ lastTime mclock.AbsTime
99
+ params * ServerParams
100
+ sumCost uint64 // sum of req costs sent to this server
101
+ pending map [uint64 ]uint64 // value = sumCost after sending the given req
102
+ assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
103
+ assignToken chan struct {} // send to this channel before assigning, read from it after deassigning
104
+ lock sync.RWMutex
103
105
}
104
106
105
107
func NewServerNode (params * ServerParams ) * ServerNode {
106
108
return & ServerNode {
107
109
bufEstimate : params .BufLimit ,
108
- lastTime : getTime (),
110
+ lastTime : mclock . Now (),
109
111
params : params ,
110
112
pending : make (map [uint64 ]uint64 ),
113
+ assignToken : make (chan struct {}, 1 ),
111
114
}
112
115
}
113
116
114
- func getTime () int64 {
115
- return int64 (mclock .Now ())
116
- }
117
-
118
- func (peer * ServerNode ) recalcBLE (time int64 ) {
117
+ func (peer * ServerNode ) recalcBLE (time mclock.AbsTime ) {
119
118
dt := uint64 (time - peer .lastTime )
120
119
if time < peer .lastTime {
121
120
dt = 0
122
121
}
123
- peer .bufEstimate += peer .params .MinRecharge * dt / fcTimeConst
122
+ peer .bufEstimate += peer .params .MinRecharge * dt / uint64 ( fcTimeConst )
124
123
if peer .bufEstimate > peer .params .BufLimit {
125
124
peer .bufEstimate = peer .params .BufLimit
126
125
}
127
126
peer .lastTime = time
128
127
}
129
128
130
- func (peer * ServerNode ) canSend (maxCost uint64 ) uint64 {
129
+ // safetyMargin is added to the flow control waiting time when estimated buffer value is low
130
+ const safetyMargin = time .Millisecond * 200
131
+
132
+ func (peer * ServerNode ) canSend (maxCost uint64 ) time.Duration {
133
+ maxCost += uint64 (safetyMargin ) * peer .params .MinRecharge / uint64 (fcTimeConst )
134
+ if maxCost > peer .params .BufLimit {
135
+ maxCost = peer .params .BufLimit
136
+ }
131
137
if peer .bufEstimate >= maxCost {
132
138
return 0
133
139
}
134
- return ( maxCost - peer .bufEstimate ) * fcTimeConst / peer .params .MinRecharge
140
+ return time . Duration (( maxCost - peer .bufEstimate ) * uint64 ( fcTimeConst ) / peer .params .MinRecharge )
135
141
}
136
142
137
- func (peer * ServerNode ) CanSend (maxCost uint64 ) uint64 {
143
+ // CanSend returns the minimum waiting time required before sending a request
144
+ // with the given maximum estimated cost
145
+ func (peer * ServerNode ) CanSend (maxCost uint64 ) time.Duration {
138
146
peer .lock .RLock ()
139
147
defer peer .lock .RUnlock ()
140
148
141
149
return peer .canSend (maxCost )
142
150
}
143
151
152
+ // AssignRequest tries to assign the server node to the given request, guaranteeing
153
+ // that once it returns true, no request will be sent to the node before this one
154
+ func (peer * ServerNode ) AssignRequest (reqID uint64 ) bool {
155
+ select {
156
+ case peer .assignToken <- struct {}{}:
157
+ default :
158
+ return false
159
+ }
160
+ peer .lock .Lock ()
161
+ peer .assignedRequest = reqID
162
+ peer .lock .Unlock ()
163
+ return true
164
+ }
165
+
166
+ // MustAssignRequest waits until the node can be assigned to the given request.
167
+ // It is always guaranteed that assignments are released in a short amount of time.
168
+ func (peer * ServerNode ) MustAssignRequest (reqID uint64 ) {
169
+ peer .assignToken <- struct {}{}
170
+ peer .lock .Lock ()
171
+ peer .assignedRequest = reqID
172
+ peer .lock .Unlock ()
173
+ }
174
+
175
+ // DeassignRequest releases a request assignment in case the planned request
176
+ // is not being sent.
177
+ func (peer * ServerNode ) DeassignRequest (reqID uint64 ) {
178
+ peer .lock .Lock ()
179
+ if peer .assignedRequest == reqID {
180
+ peer .assignedRequest = 0
181
+ <- peer .assignToken
182
+ }
183
+ peer .lock .Unlock ()
184
+ }
185
+
186
+ // IsAssigned returns true if the server node has already been assigned to a request
187
+ // (note that this function returning false does not guarantee that you can assign a request
188
+ // immediately afterwards, its only purpose is to help peer selection)
189
+ func (peer * ServerNode ) IsAssigned () bool {
190
+ peer .lock .RLock ()
191
+ locked := peer .assignedRequest != 0
192
+ peer .lock .RUnlock ()
193
+ return locked
194
+ }
195
+
144
196
// blocks until request can be sent
145
197
func (peer * ServerNode ) SendRequest (reqID , maxCost uint64 ) {
146
198
peer .lock .Lock ()
147
199
defer peer .lock .Unlock ()
148
200
149
- peer .recalcBLE (getTime ())
150
- for peer .bufEstimate < maxCost {
151
- wait := time .Duration (peer .canSend (maxCost ))
201
+ if peer .assignedRequest != reqID {
202
+ peer .lock .Unlock ()
203
+ peer .MustAssignRequest (reqID )
204
+ peer .lock .Lock ()
205
+ }
206
+
207
+ peer .recalcBLE (mclock .Now ())
208
+ wait := peer .canSend (maxCost )
209
+ for wait > 0 {
152
210
peer .lock .Unlock ()
153
211
time .Sleep (wait )
154
212
peer .lock .Lock ()
155
- peer .recalcBLE (getTime ())
213
+ peer .recalcBLE (mclock .Now ())
214
+ wait = peer .canSend (maxCost )
156
215
}
216
+ peer .assignedRequest = 0
217
+ <- peer .assignToken
157
218
peer .bufEstimate -= maxCost
158
219
peer .sumCost += maxCost
159
220
if reqID >= 0 {
@@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
162
223
}
163
224
164
225
func (peer * ServerNode ) GotReply (reqID , bv uint64 ) {
226
+
165
227
peer .lock .Lock ()
166
228
defer peer .lock .Unlock ()
167
229
230
+ if bv > peer .params .BufLimit {
231
+ bv = peer .params .BufLimit
232
+ }
168
233
sc , ok := peer .pending [reqID ]
169
234
if ! ok {
170
235
return
171
236
}
172
237
delete (peer .pending , reqID )
173
238
peer .bufEstimate = bv - (peer .sumCost - sc )
174
- peer .lastTime = getTime ()
239
+ peer .lastTime = mclock . Now ()
175
240
}
0 commit comments