Skip to content

Commit ce2a7e2

Browse files
committed
swarm/storage: complete pyramid splitter
* add size calculation, chunk passing and waitgroup control * benchmark against tree chunker - conclusions remain * remove logging from chunker * fix joiner and extend tests and benchmarks
1 parent 5097da7 commit ce2a7e2

File tree

5 files changed

+200
-130
lines changed

5 files changed

+200
-130
lines changed

swarm/storage/chunker.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ import (
77
"hash"
88
"io"
99
"sync"
10-
11-
"github.com/ethereum/go-ethereum/logger"
12-
"github.com/ethereum/go-ethereum/logger/glog"
10+
// "github.com/ethereum/go-ethereum/logger"
11+
// "github.com/ethereum/go-ethereum/logger/glog"
1312
)
1413

1514
/*
@@ -321,7 +320,7 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
321320
if err != nil {
322321
return 0, err
323322
}
324-
glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size)
323+
// glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size)
325324

326325
errC := make(chan error)
327326
// glog.V(logger.Detail).Infof("[BZZ] readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off)
@@ -350,9 +349,9 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
350349
return 0, err
351350
}
352351
// glog.V(logger.Detail).Infof("[BZZ] ReadAt received %v", err)
353-
glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size)
352+
// glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size)
354353
if off+int64(len(b)) >= size {
355-
glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b))
354+
// glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b))
356355
return len(b), io.EOF
357356
}
358357
// glog.V(logger.Detail).Infof("[BZZ] ReadAt returning at %d: %v", read, err)
@@ -362,7 +361,7 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
362361
func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
363362
defer parentWg.Done()
364363
// return NewDPA(&LocalStore{})
365-
glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff)
364+
// glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff)
366365

367366
// glog.V(logger.Detail).Infof("[BZZ] depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize)
368367

@@ -376,7 +375,11 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
376375

377376
// leaf chunk found
378377
if depth == 0 {
379-
glog.V(logger.Detail).Infof("[BZZ] depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, treeSize)
378+
// glog.V(logger.Detail).Infof("[BZZ] depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize)
379+
extra := 8 + eoff - int64(len(chunk.SData))
380+
if extra > 0 {
381+
eoff -= extra
382+
}
380383
copy(b, chunk.SData[8+off:8+eoff])
381384
return // simply give back the chunks reader for content chunks
382385
}
@@ -387,7 +390,7 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
387390

388391
wg := &sync.WaitGroup{}
389392
defer wg.Wait()
390-
glog.V(logger.Detail).Infof("[BZZ] start %v,end %v", start, end)
393+
// glog.V(logger.Detail).Infof("[BZZ] start %v,end %v", start, end)
391394

392395
for i := start; i < end; i++ {
393396
soff := i * treeSize
@@ -457,7 +460,7 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
457460
// Read keeps a cursor so cannot be called simulateously, see ReadAt
458461
func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
459462
read, err = self.ReadAt(b, self.off)
460-
glog.V(logger.Detail).Infof("[BZZ] read: %v, off: %v, error: %v", read, self.off, err)
463+
// glog.V(logger.Detail).Infof("[BZZ] read: %v, off: %v, error: %v", read, self.off, err)
461464

462465
self.off += int64(read)
463466
return

swarm/storage/chunker_test.go

Lines changed: 73 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,27 @@ package storage
22

33
import (
44
"bytes"
5+
"encoding/binary"
56
"fmt"
67
"io"
78
"runtime"
89
"sync"
910
"testing"
1011
"time"
11-
12-
"github.com/ethereum/go-ethereum/logger"
13-
"github.com/ethereum/go-ethereum/logger/glog"
1412
)
1513

16-
func init() {
17-
glog.SetV(logger.Info)
18-
glog.SetToStderr(true)
19-
}
20-
2114
/*
2215
Tests TreeChunker by splitting and joining a random byte slice
2316
*/
2417

2518
type test interface {
2619
Fatalf(string, ...interface{})
20+
Logf(string, ...interface{})
2721
}
2822

2923
type chunkerTester struct {
30-
chunks []*Chunk
24+
inputs map[uint64][]byte
25+
chunks map[string]*Chunk
3126
t test
3227
}
3328

@@ -40,7 +35,12 @@ func (self *chunkerTester) checkChunks(t *testing.T, want int) {
4035

4136
func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup) (key Key) {
4237
// reset
43-
self.chunks = nil
38+
self.chunks = make(map[string]*Chunk)
39+
40+
if self.inputs == nil {
41+
self.inputs = make(map[uint64][]byte)
42+
}
43+
4444
quitC := make(chan bool)
4545
timeout := time.After(600 * time.Second)
4646
if chunkC != nil {
@@ -57,7 +57,8 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c
5757
return
5858
}
5959
// glog.V(logger.Info).Infof("chunk %v received", len(self.chunks))
60-
self.chunks = append(self.chunks, chunk)
60+
// self.chunks = append(self.chunks, chunk)
61+
self.chunks[chunk.Key.String()] = chunk
6162
if chunk.wg != nil {
6263
chunk.wg.Done()
6364
}
@@ -73,22 +74,26 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c
7374
if swg != nil {
7475
// glog.V(logger.Info).Infof("Waiting for storage to finish")
7576
swg.Wait()
76-
// glog.V(logger.Info).Infof("St orage finished")
77+
// glog.V(logger.Info).Infof("Storage finished")
7778
}
7879
close(chunkC)
7980
}
8081
if chunkC != nil {
82+
// glog.V(logger.Info).Infof("waiting for splitter finished")
8183
<-quitC
84+
// glog.V(logger.Info).Infof("Splitter finished")
8285
}
8386
return
8487
}
8588

86-
func (self *chunkerTester) Join(chunker *TreeChunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader {
89+
func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader {
8790
// reset but not the chunks
8891

92+
// glog.V(logger.Info).Infof("Splitter finished")
8993
reader := chunker.Join(key, chunkC)
9094

9195
timeout := time.After(600 * time.Second)
96+
// glog.V(logger.Info).Infof("Splitter finished")
9297
i := 0
9398
go func() {
9499
for {
@@ -101,66 +106,77 @@ func (self *chunkerTester) Join(chunker *TreeChunker, key Key, c int, chunkC cha
101106
close(quitC)
102107
return
103108
}
104-
i++
109+
// glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
105110
// this just mocks the behaviour of a chunk store retrieval
106-
var found bool
107-
for _, ch := range self.chunks {
108-
if bytes.Equal(chunk.Key, ch.Key) {
109-
found = true
110-
chunk.SData = ch.SData
111-
break
112-
}
113-
}
114-
if !found {
115-
self.t.Fatalf("not found ")
111+
stored, success := self.chunks[chunk.Key.String()]
112+
// glog.V(logger.Info).Infof("chunk %v, success: %v", chunk.Key.String(), success)
113+
if !success {
114+
self.t.Fatalf("not found")
115+
return
116116
}
117+
// glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
118+
chunk.SData = stored.SData
119+
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
117120
close(chunk.C)
121+
i++
118122
}
119123
}
120124
}()
121125
return reader
122126
}
123127

124-
func testRandomData(n int, chunks int, t *testing.T) {
125-
chunker := NewTreeChunker(&ChunkerParams{
126-
Branches: 128,
127-
Hash: "SHA3",
128-
})
129-
tester := &chunkerTester{t: t}
130-
data, input := testDataReaderAndSlice(n)
128+
func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
129+
if tester.inputs == nil {
130+
tester.inputs = make(map[uint64][]byte)
131+
}
132+
input, found := tester.inputs[uint64(n)]
133+
var data io.Reader
134+
if !found {
135+
data, input = testDataReaderAndSlice(n)
136+
tester.inputs[uint64(n)] = input
137+
} else {
138+
data = limitReader(bytes.NewReader(input), n)
139+
}
131140

132141
chunkC := make(chan *Chunk, 1000)
133142
swg := &sync.WaitGroup{}
134143

135-
splitter := chunker
136144
key := tester.Split(splitter, data, int64(n), chunkC, swg)
145+
tester.t.Logf(" Key = %v\n", key)
137146

138-
// t.Logf(" Key = %v\n", key)
139-
140-
// tester.checkChunks(t, chunks)
141147
chunkC = make(chan *Chunk, 1000)
142148
quitC := make(chan bool)
143149

150+
chunker := NewTreeChunker(NewChunkerParams())
144151
reader := tester.Join(chunker, key, 0, chunkC, quitC)
145152
output := make([]byte, n)
153+
// glog.V(logger.Info).Infof(" Key = %v\n", key)
146154
r, err := reader.Read(output)
155+
// glog.V(logger.Info).Infof(" read = %v %v\n", r, err)
147156
if r != n || err != io.EOF {
148-
t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err)
157+
tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err)
149158
}
150159
if input != nil {
151160
if !bytes.Equal(output, input) {
152-
t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", input, output)
161+
tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", input, output)
153162
}
154163
}
155164
close(chunkC)
156165
<-quitC
157166
}
158167

159168
func TestRandomData(t *testing.T) {
160-
testRandomData(60, 1, t)
161-
testRandomData(83, 3, t)
162-
testRandomData(179, 5, t)
163-
testRandomData(253, 7, t)
169+
// sizes := []int{123456}
170+
sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 123456}
171+
tester := &chunkerTester{t: t}
172+
chunker := NewTreeChunker(NewChunkerParams())
173+
for _, s := range sizes {
174+
testRandomData(chunker, s, tester)
175+
}
176+
pyramid := NewPyramidChunker(NewChunkerParams())
177+
for _, s := range sizes {
178+
testRandomData(pyramid, s, tester)
179+
}
164180
}
165181

166182
func readAll(reader LazySectionReader, result []byte) {
@@ -186,36 +202,34 @@ func benchReadAll(reader LazySectionReader) {
186202
}
187203

188204
func benchmarkJoin(n int, t *testing.B) {
205+
t.ReportAllocs()
189206
for i := 0; i < t.N; i++ {
190-
chunker := NewTreeChunker(&ChunkerParams{
191-
Branches: 128,
192-
Hash: "SHA3",
193-
})
207+
chunker := NewTreeChunker(NewChunkerParams())
194208
tester := &chunkerTester{t: t}
195209
data := testDataReader(n)
196210

197211
chunkC := make(chan *Chunk, 1000)
198212
swg := &sync.WaitGroup{}
199213

200214
key := tester.Split(chunker, data, int64(n), chunkC, swg)
201-
t.StartTimer()
215+
// t.StartTimer()
202216
chunkC = make(chan *Chunk, 1000)
203217
quitC := make(chan bool)
204218
reader := tester.Join(chunker, key, i, chunkC, quitC)
205-
t.StopTimer()
206219
benchReadAll(reader)
207220
close(chunkC)
208221
<-quitC
222+
// t.StopTimer()
209223
}
224+
stats := new(runtime.MemStats)
225+
runtime.ReadMemStats(stats)
226+
fmt.Println(stats.Sys)
210227
}
211228

212229
func benchmarkSplitTree(n int, t *testing.B) {
213230
t.ReportAllocs()
214231
for i := 0; i < t.N; i++ {
215-
chunker := NewTreeChunker(&ChunkerParams{
216-
Branches: 128,
217-
Hash: "SHA3",
218-
})
232+
chunker := NewTreeChunker(NewChunkerParams())
219233
tester := &chunkerTester{t: t}
220234
data := testDataReader(n)
221235
// glog.V(logger.Info).Infof("splitting data of length %v", n)
@@ -229,10 +243,7 @@ func benchmarkSplitTree(n int, t *testing.B) {
229243
func benchmarkSplitPyramid(n int, t *testing.B) {
230244
t.ReportAllocs()
231245
for i := 0; i < t.N; i++ {
232-
splitter := NewPyramidChunker(&ChunkerParams{
233-
Branches: 128,
234-
Hash: "SHA3",
235-
})
246+
splitter := NewPyramidChunker(NewChunkerParams())
236247
tester := &chunkerTester{t: t}
237248
data := testDataReader(n)
238249
// glog.V(logger.Info).Infof("splitting data of length %v", n)
@@ -243,11 +254,13 @@ func benchmarkSplitPyramid(n int, t *testing.B) {
243254
fmt.Println(stats.Sys)
244255
}
245256

246-
func BenchmarkJoin_100_2(t *testing.B) { benchmarkJoin(100, t) }
247-
func BenchmarkJoin_1000_2(t *testing.B) { benchmarkJoin(1000, t) }
248-
func BenchmarkJoin_10000_2(t *testing.B) { benchmarkJoin(10000, t) }
249-
func BenchmarkJoin_100000_2(t *testing.B) { benchmarkJoin(100000, t) }
250-
func BenchmarkJoin_1000000_2(t *testing.B) { benchmarkJoin(1000000, t) }
257+
func BenchmarkJoin_2(t *testing.B) { benchmarkJoin(100, t) }
258+
func BenchmarkJoin_3(t *testing.B) { benchmarkJoin(1000, t) }
259+
func BenchmarkJoin_4(t *testing.B) { benchmarkJoin(10000, t) }
260+
func BenchmarkJoin_5(t *testing.B) { benchmarkJoin(100000, t) }
261+
func BenchmarkJoin_6(t *testing.B) { benchmarkJoin(1000000, t) }
262+
func BenchmarkJoin_7(t *testing.B) { benchmarkJoin(10000000, t) }
263+
func BenchmarkJoin_8(t *testing.B) { benchmarkJoin(100000000, t) }
251264

252265
func BenchmarkSplitTree_2(t *testing.B) { benchmarkSplitTree(100, t) }
253266
func BenchmarkSplitTree_2h(t *testing.B) { benchmarkSplitTree(500, t) }

swarm/storage/common_test.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,40 @@ import (
1111
"github.com/ethereum/go-ethereum/logger/glog"
1212
)
1313

14+
type limitedReader struct {
15+
r io.Reader
16+
off int64
17+
size int64
18+
}
19+
20+
func limitReader(r io.Reader, size int) *limitedReader {
21+
return &limitedReader{r, 0, int64(size)}
22+
}
23+
24+
func (self *limitedReader) Read(buf []byte) (int, error) {
25+
limit := int64(len(buf))
26+
left := self.size - self.off
27+
if limit >= left {
28+
limit = left
29+
}
30+
n, err := self.r.Read(buf[:limit])
31+
if err == nil && limit == left {
32+
err = io.EOF
33+
}
34+
self.off += int64(n)
35+
return n, err
36+
}
37+
1438
func testDataReader(l int) (r io.Reader) {
15-
return io.LimitReader(rand.Reader, int64(l))
39+
return limitReader(rand.Reader, l)
1640
}
1741

1842
func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) {
1943
slice = make([]byte, l)
2044
if _, err := rand.Read(slice); err != nil {
2145
panic("rand error")
2246
}
23-
r = bytes.NewReader(slice)
47+
r = limitReader(bytes.NewReader(slice), l)
2448
return
2549
}
2650

0 commit comments

Comments
 (0)