Skip to content

Commit 681b51a

Browse files
authored
Merge pull request ethereum#3519 from zsfelfoldi/light-topic5
les: fixed selectPeer deadlock, improved request distribution
2 parents 4268cb8 + 66979aa commit 681b51a

File tree

11 files changed

+290
-137
lines changed

11 files changed

+290
-137
lines changed

les/fetcher.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() {
125125
f.pm.wg.Add(1)
126126
defer f.pm.wg.Done()
127127

128-
requestStarted := false
128+
requesting := false
129129
for {
130130
select {
131131
case <-f.pm.quitSync:
@@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() {
134134
// no further requests are necessary or possible
135135
case newAnnounce := <-f.requestChn:
136136
f.lock.Lock()
137-
s := requestStarted
138-
requestStarted = false
137+
s := requesting
138+
requesting = false
139139
if !f.syncing && !(newAnnounce && s) {
140-
if peer, node, amount := f.nextRequest(); node != nil {
141-
requestStarted = true
142-
reqID, started := f.request(peer, node, amount)
143-
if started {
140+
reqID := getNextReqID()
141+
if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
142+
requesting = true
143+
if reqID, ok := f.request(peer, reqID, node, amount); ok {
144144
go func() {
145145
time.Sleep(softRequestTimeout)
146146
f.reqMu.Lock()
@@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() {
154154
f.requestChn <- false
155155
}()
156156
}
157+
} else {
158+
if retry {
159+
requesting = true
160+
go func() {
161+
time.Sleep(time.Millisecond * 100)
162+
f.requestChn <- false
163+
}()
164+
}
157165
}
158166
}
159167
f.lock.Unlock()
@@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
344352
}
345353

346354
// request initiates a header download request from a certain peer
347-
func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
355+
func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
348356
fp := f.peers[p]
349357
if fp == nil {
350358
glog.V(logger.Debug).Infof("request: unknown peer")
359+
p.fcServer.DeassignRequest(reqID)
351360
return 0, false
352361
}
353362
if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
@@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint
357366
f.pm.synchronise(p)
358367
f.syncDone <- p
359368
}()
369+
p.fcServer.DeassignRequest(reqID)
360370
return 0, false
361371
}
362372

363-
reqID := getNextReqID()
364373
n.requested = true
365374
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
366375
p.fcServer.SendRequest(reqID, cost)
@@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
400409

401410
// nextRequest selects the peer and announced head to be requested next, amount
402411
// to be downloaded starting from the head backwards is also returned
403-
func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
412+
func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
404413
var (
405414
bestHash common.Hash
406415
bestAmount uint64
@@ -420,44 +429,53 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
420429
}
421430
}
422431
if bestTd == f.maxConfirmedTd {
423-
return nil, nil, 0
432+
return nil, nil, 0, false
424433
}
425434

426-
peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
435+
peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
427436
fp := f.peers[p]
428437
if fp == nil || fp.nodeByHash[bestHash] == nil {
429438
return false, 0
430439
}
431440
return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
432441
})
442+
if !locked {
443+
return nil, nil, 0, true
444+
}
433445
var node *fetcherTreeNode
434446
if peer != nil {
435447
node = f.peers[peer].nodeByHash[bestHash]
436448
}
437-
return peer, node, bestAmount
449+
return peer, node, bestAmount, false
438450
}
439451

440452
// deliverHeaders delivers header download request responses for processing
441453
func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
442454
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
443455
}
444456

445-
// processResponse processes header download request responses
457+
// processResponse processes header download request responses, returns true if successful
446458
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
447459
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
460+
glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8])
448461
return false
449462
}
450463
headers := make([]*types.Header, req.amount)
451464
for i, header := range resp.headers {
452465
headers[int(req.amount)-1-i] = header
453466
}
454467
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
468+
if err == core.BlockFutureErr {
469+
return true
470+
}
471+
glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err)
455472
return false
456473
}
457474
tds := make([]*big.Int, len(headers))
458475
for i, header := range headers {
459476
td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
460477
if td == nil {
478+
glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers))
461479
return false
462480
}
463481
tds[i] = td

les/flowcontrol/control.go

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/ethereum/go-ethereum/common/mclock"
2525
)
2626

27-
const fcTimeConst = 1000000
27+
const fcTimeConst = time.Millisecond
2828

2929
type ServerParams struct {
3030
BufLimit, MinRecharge uint64
@@ -33,7 +33,7 @@ type ServerParams struct {
3333
type ClientNode struct {
3434
params *ServerParams
3535
bufValue uint64
36-
lastTime int64
36+
lastTime mclock.AbsTime
3737
lock sync.Mutex
3838
cm *ClientManager
3939
cmNode *cmNode
@@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode {
4444
cm: cm,
4545
params: params,
4646
bufValue: params.BufLimit,
47-
lastTime: getTime(),
47+
lastTime: mclock.Now(),
4848
}
4949
node.cmNode = cm.addNode(node)
5050
return node
@@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) {
5454
cm.removeNode(peer.cmNode)
5555
}
5656

57-
func (peer *ClientNode) recalcBV(time int64) {
57+
func (peer *ClientNode) recalcBV(time mclock.AbsTime) {
5858
dt := uint64(time - peer.lastTime)
5959
if time < peer.lastTime {
6060
dt = 0
6161
}
62-
peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst
62+
peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst)
6363
if peer.bufValue > peer.params.BufLimit {
6464
peer.bufValue = peer.params.BufLimit
6565
}
@@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) {
7070
peer.lock.Lock()
7171
defer peer.lock.Unlock()
7272

73-
time := getTime()
73+
time := mclock.Now()
7474
peer.recalcBV(time)
7575
return peer.bufValue, peer.cm.accept(peer.cmNode, time)
7676
}
@@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
7979
peer.lock.Lock()
8080
defer peer.lock.Unlock()
8181

82-
time := getTime()
82+
time := mclock.Now()
8383
peer.recalcBV(time)
8484
peer.bufValue -= cost
8585
peer.recalcBV(time)
@@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
9494
}
9595

9696
type ServerNode struct {
97-
bufEstimate uint64
98-
lastTime int64
99-
params *ServerParams
100-
sumCost uint64 // sum of req costs sent to this server
101-
pending map[uint64]uint64 // value = sumCost after sending the given req
102-
lock sync.RWMutex
97+
bufEstimate uint64
98+
lastTime mclock.AbsTime
99+
params *ServerParams
100+
sumCost uint64 // sum of req costs sent to this server
101+
pending map[uint64]uint64 // value = sumCost after sending the given req
102+
assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
103+
assignToken chan struct{} // send to this channel before assigning, read from it after deassigning
104+
lock sync.RWMutex
103105
}
104106

105107
func NewServerNode(params *ServerParams) *ServerNode {
106108
return &ServerNode{
107109
bufEstimate: params.BufLimit,
108-
lastTime: getTime(),
110+
lastTime: mclock.Now(),
109111
params: params,
110112
pending: make(map[uint64]uint64),
113+
assignToken: make(chan struct{}, 1),
111114
}
112115
}
113116

114-
func getTime() int64 {
115-
return int64(mclock.Now())
116-
}
117-
118-
func (peer *ServerNode) recalcBLE(time int64) {
117+
func (peer *ServerNode) recalcBLE(time mclock.AbsTime) {
119118
dt := uint64(time - peer.lastTime)
120119
if time < peer.lastTime {
121120
dt = 0
122121
}
123-
peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst
122+
peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst)
124123
if peer.bufEstimate > peer.params.BufLimit {
125124
peer.bufEstimate = peer.params.BufLimit
126125
}
127126
peer.lastTime = time
128127
}
129128

130-
func (peer *ServerNode) canSend(maxCost uint64) uint64 {
129+
// safetyMargin is added to the flow control waiting time when estimated buffer value is low
130+
const safetyMargin = time.Millisecond * 200
131+
132+
func (peer *ServerNode) canSend(maxCost uint64) time.Duration {
133+
maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst)
134+
if maxCost > peer.params.BufLimit {
135+
maxCost = peer.params.BufLimit
136+
}
131137
if peer.bufEstimate >= maxCost {
132138
return 0
133139
}
134-
return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge
140+
return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge)
135141
}
136142

137-
func (peer *ServerNode) CanSend(maxCost uint64) uint64 {
143+
// CanSend returns the minimum waiting time required before sending a request
144+
// with the given maximum estimated cost
145+
func (peer *ServerNode) CanSend(maxCost uint64) time.Duration {
138146
peer.lock.RLock()
139147
defer peer.lock.RUnlock()
140148

141149
return peer.canSend(maxCost)
142150
}
143151

152+
// AssignRequest tries to assign the server node to the given request, guaranteeing
153+
// that once it returns true, no request will be sent to the node before this one
154+
func (peer *ServerNode) AssignRequest(reqID uint64) bool {
155+
select {
156+
case peer.assignToken <- struct{}{}:
157+
default:
158+
return false
159+
}
160+
peer.lock.Lock()
161+
peer.assignedRequest = reqID
162+
peer.lock.Unlock()
163+
return true
164+
}
165+
166+
// MustAssignRequest waits until the node can be assigned to the given request.
167+
// It is always guaranteed that assignments are released in a short amount of time.
168+
func (peer *ServerNode) MustAssignRequest(reqID uint64) {
169+
peer.assignToken <- struct{}{}
170+
peer.lock.Lock()
171+
peer.assignedRequest = reqID
172+
peer.lock.Unlock()
173+
}
174+
175+
// DeassignRequest releases a request assignment in case the planned request
176+
// is not being sent.
177+
func (peer *ServerNode) DeassignRequest(reqID uint64) {
178+
peer.lock.Lock()
179+
if peer.assignedRequest == reqID {
180+
peer.assignedRequest = 0
181+
<-peer.assignToken
182+
}
183+
peer.lock.Unlock()
184+
}
185+
186+
// IsAssigned returns true if the server node has already been assigned to a request
187+
// (note that this function returning false does not guarantee that you can assign a request
188+
// immediately afterwards, its only purpose is to help peer selection)
189+
func (peer *ServerNode) IsAssigned() bool {
190+
peer.lock.RLock()
191+
locked := peer.assignedRequest != 0
192+
peer.lock.RUnlock()
193+
return locked
194+
}
195+
144196
// blocks until request can be sent
145197
func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
146198
peer.lock.Lock()
147199
defer peer.lock.Unlock()
148200

149-
peer.recalcBLE(getTime())
150-
for peer.bufEstimate < maxCost {
151-
wait := time.Duration(peer.canSend(maxCost))
201+
if peer.assignedRequest != reqID {
202+
peer.lock.Unlock()
203+
peer.MustAssignRequest(reqID)
204+
peer.lock.Lock()
205+
}
206+
207+
peer.recalcBLE(mclock.Now())
208+
wait := peer.canSend(maxCost)
209+
for wait > 0 {
152210
peer.lock.Unlock()
153211
time.Sleep(wait)
154212
peer.lock.Lock()
155-
peer.recalcBLE(getTime())
213+
peer.recalcBLE(mclock.Now())
214+
wait = peer.canSend(maxCost)
156215
}
216+
peer.assignedRequest = 0
217+
<-peer.assignToken
157218
peer.bufEstimate -= maxCost
158219
peer.sumCost += maxCost
159220
if reqID >= 0 {
@@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
162223
}
163224

164225
func (peer *ServerNode) GotReply(reqID, bv uint64) {
226+
165227
peer.lock.Lock()
166228
defer peer.lock.Unlock()
167229

230+
if bv > peer.params.BufLimit {
231+
bv = peer.params.BufLimit
232+
}
168233
sc, ok := peer.pending[reqID]
169234
if !ok {
170235
return
171236
}
172237
delete(peer.pending, reqID)
173238
peer.bufEstimate = bv - (peer.sumCost - sc)
174-
peer.lastTime = getTime()
239+
peer.lastTime = mclock.Now()
175240
}

0 commit comments

Comments
 (0)