Skip to content

Commit 2ed729d

Browse files
rjl493456442zsfelfoldi
authored andcommitted
les: handler separation (ethereum#19639)
les: handler separation
1 parent 4aee0d1 commit 2ed729d

31 files changed

+2364
-2512
lines changed

core/blockchain.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ const (
7575
bodyCacheLimit = 256
7676
blockCacheLimit = 256
7777
receiptsCacheLimit = 32
78+
txLookupCacheLimit = 1024
7879
maxFutureBlocks = 256
7980
maxTimeFutureBlocks = 30
8081
badBlockLimit = 10
@@ -155,6 +156,7 @@ type BlockChain struct {
155156
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
156157
receiptsCache *lru.Cache // Cache for the most recent receipts per block
157158
blockCache *lru.Cache // Cache for the most recent entire blocks
159+
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
158160
futureBlocks *lru.Cache // future blocks are blocks added for later processing
159161

160162
quit chan struct{} // blockchain quit channel
@@ -189,6 +191,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
189191
bodyRLPCache, _ := lru.New(bodyCacheLimit)
190192
receiptsCache, _ := lru.New(receiptsCacheLimit)
191193
blockCache, _ := lru.New(blockCacheLimit)
194+
txLookupCache, _ := lru.New(txLookupCacheLimit)
192195
futureBlocks, _ := lru.New(maxFutureBlocks)
193196
badBlocks, _ := lru.New(badBlockLimit)
194197

@@ -204,6 +207,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
204207
bodyRLPCache: bodyRLPCache,
205208
receiptsCache: receiptsCache,
206209
blockCache: blockCache,
210+
txLookupCache: txLookupCache,
207211
futureBlocks: futureBlocks,
208212
engine: engine,
209213
vmConfig: vmConfig,
@@ -440,6 +444,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
440444
bc.bodyRLPCache.Purge()
441445
bc.receiptsCache.Purge()
442446
bc.blockCache.Purge()
447+
bc.txLookupCache.Purge()
443448
bc.futureBlocks.Purge()
444449

445450
return bc.loadLastState()
@@ -921,6 +926,7 @@ func (bc *BlockChain) truncateAncient(head uint64) error {
921926
bc.bodyRLPCache.Purge()
922927
bc.receiptsCache.Purge()
923928
bc.blockCache.Purge()
929+
bc.txLookupCache.Purge()
924930
bc.futureBlocks.Purge()
925931

926932
log.Info("Rewind ancient data", "number", head)
@@ -2151,6 +2157,22 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
21512157
return bc.hc.GetHeaderByNumber(number)
21522158
}
21532159

2160+
// GetTransactionLookup retrieves the lookup associate with the given transaction
2161+
// hash from the cache or database.
2162+
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) *rawdb.LegacyTxLookupEntry {
2163+
// Short circuit if the txlookup already in the cache, retrieve otherwise
2164+
if lookup, exist := bc.txLookupCache.Get(hash); exist {
2165+
return lookup.(*rawdb.LegacyTxLookupEntry)
2166+
}
2167+
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
2168+
if tx == nil {
2169+
return nil
2170+
}
2171+
lookup := &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
2172+
bc.txLookupCache.Add(hash, lookup)
2173+
return lookup
2174+
}
2175+
21542176
// Config retrieves the chain's fork configuration.
21552177
func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }
21562178

les/api.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,11 @@ var (
3030
// PrivateLightAPI provides an API to access the LES light server or light client.
3131
type PrivateLightAPI struct {
3232
backend *lesCommons
33-
reg *checkpointOracle
3433
}
3534

3635
// NewPrivateLightAPI creates a new LES service API.
37-
func NewPrivateLightAPI(backend *lesCommons, reg *checkpointOracle) *PrivateLightAPI {
38-
return &PrivateLightAPI{
39-
backend: backend,
40-
reg: reg,
41-
}
36+
func NewPrivateLightAPI(backend *lesCommons) *PrivateLightAPI {
37+
return &PrivateLightAPI{backend: backend}
4238
}
4339

4440
// LatestCheckpoint returns the latest local checkpoint package.
@@ -67,7 +63,7 @@ func (api *PrivateLightAPI) LatestCheckpoint() ([4]string, error) {
6763
// result[2], 32 bytes hex encoded latest section bloom trie root hash
6864
func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
6965
var res [3]string
70-
cp := api.backend.getLocalCheckpoint(index)
66+
cp := api.backend.localCheckpoint(index)
7167
if cp.Empty() {
7268
return res, errNoCheckpoint
7369
}
@@ -77,8 +73,8 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
7773

7874
// GetCheckpointContractAddress returns the contract contract address in hex format.
7975
func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) {
80-
if api.reg == nil {
76+
if api.backend.oracle == nil {
8177
return "", errNotActivated
8278
}
83-
return api.reg.config.Address.Hex(), nil
79+
return api.backend.oracle.config.Address.Hex(), nil
8480
}

les/api_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (b *LesApiBackend) CurrentBlock() *types.Block {
5454
}
5555

5656
func (b *LesApiBackend) SetHead(number uint64) {
57-
b.eth.protocolManager.downloader.Cancel()
57+
b.eth.handler.downloader.Cancel()
5858
b.eth.blockchain.SetHead(number)
5959
}
6060

les/api_test.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,16 @@ func TestCapacityAPI10(t *testing.T) {
7878
// while connected and going back and forth between free and priority mode with
7979
// the supplied API calls is also thoroughly tested.
8080
func testCapacityAPI(t *testing.T, clientCount int) {
81+
// Skip test if no data dir specified
8182
if testServerDataDir == "" {
82-
// Skip test if no data dir specified
8383
return
8484
}
85-
8685
for !testSim(t, 1, clientCount, []string{testServerDataDir}, nil, func(ctx context.Context, net *simulations.Network, servers []*simulations.Node, clients []*simulations.Node) bool {
8786
if len(servers) != 1 {
8887
t.Fatalf("Invalid number of servers: %d", len(servers))
8988
}
9089
server := servers[0]
9190

92-
clientRpcClients := make([]*rpc.Client, len(clients))
93-
9491
serverRpcClient, err := server.Client()
9592
if err != nil {
9693
t.Fatalf("Failed to obtain rpc client: %v", err)
@@ -105,13 +102,13 @@ func testCapacityAPI(t *testing.T, clientCount int) {
105102
}
106103
freeIdx := rand.Intn(len(clients))
107104

105+
clientRpcClients := make([]*rpc.Client, len(clients))
108106
for i, client := range clients {
109107
var err error
110108
clientRpcClients[i], err = client.Client()
111109
if err != nil {
112110
t.Fatalf("Failed to obtain rpc client: %v", err)
113111
}
114-
115112
t.Log("connecting client", i)
116113
if i != freeIdx {
117114
setCapacity(ctx, t, serverRpcClient, client.ID(), testCap/uint64(len(clients)))
@@ -138,21 +135,22 @@ func testCapacityAPI(t *testing.T, clientCount int) {
138135

139136
reqCount := make([]uint64, len(clientRpcClients))
140137

138+
// Send light request like crazy.
141139
for i, c := range clientRpcClients {
142140
wg.Add(1)
143141
i, c := i, c
144142
go func() {
143+
defer wg.Done()
144+
145145
queue := make(chan struct{}, 100)
146146
reqCount[i] = 0
147147
for {
148148
select {
149149
case queue <- struct{}{}:
150150
select {
151151
case <-stop:
152-
wg.Done()
153152
return
154153
case <-ctx.Done():
155-
wg.Done()
156154
return
157155
default:
158156
wg.Add(1)
@@ -169,10 +167,8 @@ func testCapacityAPI(t *testing.T, clientCount int) {
169167
}()
170168
}
171169
case <-stop:
172-
wg.Done()
173170
return
174171
case <-ctx.Done():
175-
wg.Done()
176172
return
177173
}
178174
}
@@ -313,12 +309,10 @@ func getHead(ctx context.Context, t *testing.T, client *rpc.Client) (uint64, com
313309
}
314310

315311
func testRequest(ctx context.Context, t *testing.T, client *rpc.Client) bool {
316-
//res := make(map[string]interface{})
317312
var res string
318313
var addr common.Address
319314
rand.Read(addr[:])
320315
c, _ := context.WithTimeout(ctx, time.Second*12)
321-
// if err := client.CallContext(ctx, &res, "eth_getProof", addr, nil, "latest"); err != nil {
322316
err := client.CallContext(c, &res, "eth_getBalance", addr, "latest")
323317
if err != nil {
324318
t.Log("request error:", err)
@@ -418,7 +412,6 @@ func NewNetwork() (*simulations.Network, func(), error) {
418412
adapterTeardown()
419413
net.Shutdown()
420414
}
421-
422415
return net, teardown, nil
423416
}
424417

@@ -516,7 +509,6 @@ func newLesServerService(ctx *adapters.ServiceContext) (node.Service, error) {
516509
if err != nil {
517510
return nil, err
518511
}
519-
520512
server, err := NewLesServer(ethereum, &config)
521513
if err != nil {
522514
return nil, err

les/benchmark.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
// requestBenchmark is an interface for different randomized request generators
4040
type requestBenchmark interface {
4141
// init initializes the generator for generating the given number of randomized requests
42-
init(pm *ProtocolManager, count int) error
42+
init(h *serverHandler, count int) error
4343
// request initiates sending a single request to the given peer
4444
request(peer *peer, index int) error
4545
}
@@ -52,10 +52,10 @@ type benchmarkBlockHeaders struct {
5252
hashes []common.Hash
5353
}
5454

55-
func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
55+
func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
5656
d := int64(b.amount-1) * int64(b.skip+1)
5757
b.offset = 0
58-
b.randMax = pm.blockchain.CurrentHeader().Number.Int64() + 1 - d
58+
b.randMax = h.blockchain.CurrentHeader().Number.Int64() + 1 - d
5959
if b.randMax < 0 {
6060
return fmt.Errorf("chain is too short")
6161
}
@@ -65,7 +65,7 @@ func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
6565
if b.byHash {
6666
b.hashes = make([]common.Hash, count)
6767
for i := range b.hashes {
68-
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
68+
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
6969
}
7070
}
7171
return nil
@@ -85,11 +85,11 @@ type benchmarkBodiesOrReceipts struct {
8585
hashes []common.Hash
8686
}
8787

88-
func (b *benchmarkBodiesOrReceipts) init(pm *ProtocolManager, count int) error {
89-
randMax := pm.blockchain.CurrentHeader().Number.Int64() + 1
88+
func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
89+
randMax := h.blockchain.CurrentHeader().Number.Int64() + 1
9090
b.hashes = make([]common.Hash, count)
9191
for i := range b.hashes {
92-
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(rand.Int63n(randMax)))
92+
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(rand.Int63n(randMax)))
9393
}
9494
return nil
9595
}
@@ -108,8 +108,8 @@ type benchmarkProofsOrCode struct {
108108
headHash common.Hash
109109
}
110110

111-
func (b *benchmarkProofsOrCode) init(pm *ProtocolManager, count int) error {
112-
b.headHash = pm.blockchain.CurrentHeader().Hash()
111+
func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
112+
b.headHash = h.blockchain.CurrentHeader().Hash()
113113
return nil
114114
}
115115

@@ -130,11 +130,11 @@ type benchmarkHelperTrie struct {
130130
sectionCount, headNum uint64
131131
}
132132

133-
func (b *benchmarkHelperTrie) init(pm *ProtocolManager, count int) error {
133+
func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
134134
if b.bloom {
135-
b.sectionCount, b.headNum, _ = pm.server.bloomTrieIndexer.Sections()
135+
b.sectionCount, b.headNum, _ = h.server.bloomTrieIndexer.Sections()
136136
} else {
137-
b.sectionCount, _, _ = pm.server.chtIndexer.Sections()
137+
b.sectionCount, _, _ = h.server.chtIndexer.Sections()
138138
b.headNum = b.sectionCount*params.CHTFrequency - 1
139139
}
140140
if b.sectionCount == 0 {
@@ -170,7 +170,7 @@ type benchmarkTxSend struct {
170170
txs types.Transactions
171171
}
172172

173-
func (b *benchmarkTxSend) init(pm *ProtocolManager, count int) error {
173+
func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
174174
key, _ := crypto.GenerateKey()
175175
addr := crypto.PubkeyToAddress(key.PublicKey)
176176
signer := types.NewEIP155Signer(big.NewInt(18))
@@ -196,7 +196,7 @@ func (b *benchmarkTxSend) request(peer *peer, index int) error {
196196
// benchmarkTxStatus implements requestBenchmark
197197
type benchmarkTxStatus struct{}
198198

199-
func (b *benchmarkTxStatus) init(pm *ProtocolManager, count int) error {
199+
func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
200200
return nil
201201
}
202202

@@ -217,7 +217,7 @@ type benchmarkSetup struct {
217217

218218
// runBenchmark runs a benchmark cycle for all benchmark types in the specified
219219
// number of passes
220-
func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
220+
func (h *serverHandler) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
221221
setup := make([]*benchmarkSetup, len(benchmarks))
222222
for i, b := range benchmarks {
223223
setup[i] = &benchmarkSetup{req: b}
@@ -239,7 +239,7 @@ func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount
239239
if next.totalTime > 0 {
240240
count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
241241
}
242-
if err := pm.measure(next, count); err != nil {
242+
if err := h.measure(next, count); err != nil {
243243
next.err = err
244244
}
245245
}
@@ -275,14 +275,15 @@ func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {
275275

276276
// measure runs a benchmark for a single type in a single pass, with the given
277277
// number of requests
278-
func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
278+
func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
279279
clientPipe, serverPipe := p2p.MsgPipe()
280280
clientMeteredPipe := &meteredPipe{rw: clientPipe}
281281
serverMeteredPipe := &meteredPipe{rw: serverPipe}
282282
var id enode.ID
283283
rand.Read(id[:])
284-
clientPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
285-
serverPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
284+
285+
clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
286+
serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
286287
serverPeer.sendQueue = newExecQueue(count)
287288
serverPeer.announceType = announceTypeNone
288289
serverPeer.fcCosts = make(requestCostTable)
@@ -291,10 +292,10 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
291292
serverPeer.fcCosts[code] = c
292293
}
293294
serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
294-
serverPeer.fcClient = flowcontrol.NewClientNode(pm.server.fcManager, serverPeer.fcParams)
295+
serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams)
295296
defer serverPeer.fcClient.Disconnect()
296297

297-
if err := setup.req.init(pm, count); err != nil {
298+
if err := setup.req.init(h, count); err != nil {
298299
return err
299300
}
300301

@@ -311,7 +312,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
311312
}()
312313
go func() {
313314
for i := 0; i < count; i++ {
314-
if err := pm.handleMsg(serverPeer); err != nil {
315+
if err := h.handleMsg(serverPeer); err != nil {
315316
errCh <- err
316317
return
317318
}
@@ -336,7 +337,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
336337
if err != nil {
337338
return err
338339
}
339-
case <-pm.quitSync:
340+
case <-h.closeCh:
340341
clientPipe.Close()
341342
serverPipe.Close()
342343
return fmt.Errorf("Benchmark cancelled")

les/bloombits.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ const (
4646
func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) {
4747
for i := 0; i < bloomServiceThreads; i++ {
4848
go func() {
49+
defer eth.wg.Done()
4950
for {
5051
select {
51-
case <-eth.shutdownChan:
52+
case <-eth.closeCh:
5253
return
5354

5455
case request := <-eth.bloomRequests:

0 commit comments

Comments
 (0)