Skip to content

Commit e307d37

Browse files
authored
Control number of concurrent range writer uploaders (treeverse#1351)
* Control number of concurrent range writer uploaders Fixes treeverse#1319. Requires replacing use of mock range writers with fake range writers! Goroutines, go testing and go mocks cannot work together: if a mock receives an unexpected parameter it cannot throw an exception (because Go...), so it calls some `testing.t.Fa*`. Unfortunately testing cannot support failing in goroutines. * [CR] Fail manager creation if no uploaders configured How can you upload without uploaders? So fail metarange manager creation if it will never be able to use a metarange writer.
1 parent f68b587 commit e307d37

File tree

7 files changed

+205
-118
lines changed

7 files changed

+205
-118
lines changed

catalog/rocks/entry_catalog.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func NewEntryCatalog(cfg *config.Config, db db.Database) (*EntryCatalog, error)
9696
DiskAllocProportion: tierFSParams.MetaRangeAllocationProportion,
9797
})
9898
if err != nil {
99-
return nil, fmt.Errorf("create tiered FS for committed meta-range: %w", err)
99+
return nil, fmt.Errorf("create tiered FS for committed metaranges: %w", err)
100100
}
101101

102102
rangeFS, err := pyramid.NewFS(&params.InstanceParams{
@@ -105,20 +105,23 @@ func NewEntryCatalog(cfg *config.Config, db db.Database) (*EntryCatalog, error)
105105
DiskAllocProportion: tierFSParams.RangeAllocationProportion,
106106
})
107107
if err != nil {
108-
return nil, fmt.Errorf("create tiered FS for committed meta-range: %w", err)
108+
return nil, fmt.Errorf("create tiered FS for committed ranges: %w", err)
109109
}
110110

111111
pebbleSSTableCache := pebble.NewCache(tierFSParams.PebbleSSTableCacheSizeBytes)
112112
defer pebbleSSTableCache.Unref()
113113

114114
sstableManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, rangeFS, hashAlg)
115115
sstableMetaManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, metaRangeFS, hashAlg)
116-
sstableMetaRangeManager := committed.NewMetaRangeManager(
116+
sstableMetaRangeManager, err := committed.NewMetaRangeManager(
117117
*cfg.GetCommittedParams(),
118118
// TODO(ariels): Use separate range managers for metaranges and ranges
119119
sstableMetaManager,
120120
sstableManager,
121121
)
122+
if err != nil {
123+
return nil, fmt.Errorf("create SSTable-based metarange manager: %w", err)
124+
}
122125
committedManager := committed.NewCommittedManager(sstableMetaRangeManager)
123126

124127
stagingManager := staging.NewManager(db)

config/config.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
DefaultCommittedLocalCacheBytes = 1 * 1024 * 1024 * 1024
3838
DefaultCommittedLocalCacheDir = "~/data/lakefs/cache"
3939
DefaultCommittedPebbleSSTableCacheSizeBytes = 400_000_000
40+
DefaultCommittedLocalCacheNumUploaders = 10
4041
DefaultCommittedBlockStoragePrefix = "_lakefs"
4142
DefaultCommittedPermanentMinRangeSizeBytes = 0
4243
DefaultCommittedPermanentMaxRangeSizeBytes = 20 * 1024 * 1024
@@ -104,11 +105,11 @@ const (
104105
BlockstoreS3StreamingChunkTimeoutKey = "blockstore.s3.streaming_chunk_timeout"
105106
BlockstoreS3MaxRetriesKey = "blockstore.s3.max_retries"
106107

107-
CommittedLocalCacheSizeBytesKey = "committed.local_cache.size_bytes"
108-
CommittedLocalCacheDirKey = "committed.local_cache.dir"
109-
CommittedLocalCacheRangeProportion = "committed.local_cache.range_proportion"
110-
CommittedLocalCacheMetaRangeProportion = "committed.local_cache.metarange_proportion"
111-
108+
CommittedLocalCacheSizeBytesKey = "committed.local_cache.size_bytes"
109+
CommittedLocalCacheDirKey = "committed.local_cache.dir"
110+
CommittedLocalCacheNumUploadersKey = "committed.local_cache.max_uploaders_per_writer"
111+
CommittedLocalCacheRangeProportionKey = "committed.local_cache.range_proportion"
112+
CommittedLocalCacheMetaRangeProportionKey = "committed.local_cache.metarange_proportion"
112113
CommittedBlockStoragePrefixKey = "committed.block_storage_prefix"
113114
CommittedPermanentStorageMinRangeSizeKey = "committed.permanent.min_range_size_bytes"
114115
CommittedPermanentStorageMaxRangeSizeKey = "committed.permanent.max_range_size_bytes"
@@ -147,8 +148,9 @@ func setDefaults() {
147148

148149
viper.SetDefault(CommittedLocalCacheSizeBytesKey, DefaultCommittedLocalCacheBytes)
149150
viper.SetDefault(CommittedLocalCacheDirKey, DefaultCommittedLocalCacheDir)
150-
viper.SetDefault(CommittedLocalCacheRangeProportion, DefaultCommittedLocalCacheRangePercent)
151-
viper.SetDefault(CommittedLocalCacheMetaRangeProportion, DefaultCommittedLocalCacheMetaRangePercent)
151+
viper.SetDefault(CommittedLocalCacheNumUploadersKey, DefaultCommittedLocalCacheNumUploaders)
152+
viper.SetDefault(CommittedLocalCacheRangeProportionKey, DefaultCommittedLocalCacheRangePercent)
153+
viper.SetDefault(CommittedLocalCacheMetaRangeProportionKey, DefaultCommittedLocalCacheMetaRangePercent)
152154

153155
viper.SetDefault(CommittedBlockStoragePrefixKey, DefaultCommittedBlockStoragePrefix)
154156
viper.SetDefault(CommittedPermanentStorageMinRangeSizeKey, DefaultCommittedPermanentMinRangeSizeBytes)
@@ -369,8 +371,8 @@ func (c *Config) GetCommittedTierFSParams() (*pyramidparams.ExtParams, error) {
369371
if err != nil {
370372
return nil, fmt.Errorf("build block adapter: %w", err)
371373
}
372-
rangePro := viper.GetFloat64(CommittedLocalCacheRangeProportion)
373-
metaRangePro := viper.GetFloat64(CommittedLocalCacheMetaRangeProportion)
374+
rangePro := viper.GetFloat64(CommittedLocalCacheRangeProportionKey)
375+
metaRangePro := viper.GetFloat64(CommittedLocalCacheMetaRangeProportionKey)
374376

375377
if math.Abs(rangePro+metaRangePro-1) > floatSumTolerance {
376378
return nil, fmt.Errorf("range_proportion(%f) and metarange_proportion(%f): %w", rangePro, metaRangePro, ErrInvalidProportion)
@@ -403,6 +405,7 @@ func (c *Config) GetCommittedParams() *committed.Params {
403405
MinRangeSizeBytes: viper.GetUint64(CommittedPermanentStorageMinRangeSizeKey),
404406
MaxRangeSizeBytes: viper.GetUint64(CommittedPermanentStorageMaxRangeSizeKey),
405407
RangeSizeEntriesRaggedness: viper.GetFloat64(CommittedPermanentStorageRangeRaggednessKey),
408+
MaxUploaders: viper.GetInt(CommittedLocalCacheNumUploadersKey),
406409
}
407410
}
408411

graveler/committed/batch.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,28 @@ type ResultCloser interface {
1010
}
1111

1212
type BatchCloser struct {
13+
// mu protects results and error
14+
mu sync.Mutex
1315
results []WriteResult
1416
err error
1517

1618
wg sync.WaitGroup
1719

18-
// lock locks any access to the results and error
19-
lock sync.Mutex
20+
ch chan ResultCloser
2021
}
2122

2223
// NewBatchCloser returns a new BatchCloser
23-
func NewBatchCloser() *BatchCloser {
24-
return &BatchCloser{
25-
wg: sync.WaitGroup{},
26-
lock: sync.Mutex{},
24+
func NewBatchCloser(numClosers int) *BatchCloser {
25+
ret := &BatchCloser{
26+
// Block when all closer goroutines are busy.
27+
ch: make(chan ResultCloser),
2728
}
29+
30+
for i := 0; i < numClosers; i++ {
31+
go ret.handleClose()
32+
}
33+
34+
return ret
2835
}
2936

3037
var (
@@ -35,8 +42,8 @@ var (
3542
// Any writes executed to the writer after this call are not guaranteed to succeed.
3643
// If Wait() has already been called, returns an error.
3744
func (bc *BatchCloser) CloseWriterAsync(w ResultCloser) error {
38-
bc.lock.Lock()
39-
defer bc.lock.Unlock()
45+
bc.mu.Lock()
46+
defer bc.mu.Unlock()
4047

4148
if bc.err != nil {
4249
// Don't accept new writers if previous error occurred.
@@ -45,18 +52,24 @@ func (bc *BatchCloser) CloseWriterAsync(w ResultCloser) error {
4552
}
4653

4754
bc.wg.Add(1)
48-
go bc.closeWriter(w)
55+
bc.ch <- w
4956

5057
return nil
5158
}
5259

60+
func (bc *BatchCloser) handleClose() {
61+
for w := range bc.ch {
62+
bc.closeWriter(w)
63+
}
64+
}
65+
5366
func (bc *BatchCloser) closeWriter(w ResultCloser) {
5467
defer bc.wg.Done()
5568
res, err := w.Close()
5669

5770
// long operation is over, we can lock to have synchronized access to err and results
58-
bc.lock.Lock()
59-
defer bc.lock.Unlock()
71+
bc.mu.Lock()
72+
defer bc.mu.Unlock()
6073

6174
if err != nil {
6275
if bc.nilErrOrMultipleCalls() {
@@ -69,22 +82,24 @@ func (bc *BatchCloser) closeWriter(w ResultCloser) {
6982
bc.results = append(bc.results, *res)
7083
}
7184

72-
// Wait returns when all Writers finished.
73-
// Any failure to close a single RangeWriter will return with a nil results slice and an error.
85+
// Wait returns when all Writers finished. Returns a nil results slice and an error if *any*
86+
// RangeWriter failed to close and upload.
7487
func (bc *BatchCloser) Wait() ([]WriteResult, error) {
75-
bc.lock.Lock()
88+
bc.mu.Lock()
7689
if bc.err != nil {
77-
defer bc.lock.Unlock()
90+
defer bc.mu.Unlock()
7891
return nil, bc.err
7992
}
8093
bc.err = ErrMultipleWaitCalls
81-
bc.lock.Unlock()
94+
bc.mu.Unlock()
95+
96+
close(bc.ch)
8297

8398
bc.wg.Wait()
8499

85100
// all writers finished
86-
bc.lock.Lock()
87-
defer bc.lock.Unlock()
101+
bc.mu.Lock()
102+
defer bc.mu.Unlock()
88103
if !bc.nilErrOrMultipleCalls() {
89104
return nil, bc.err
90105
}

graveler/committed/batch_test.go

Lines changed: 112 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,83 +6,156 @@ import (
66
"strconv"
77
"testing"
88

9-
"github.com/golang/mock/gomock"
9+
"github.com/go-test/deep"
10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112
"github.com/treeverse/lakefs/graveler/committed"
12-
"github.com/treeverse/lakefs/graveler/committed/mock"
1313
)
1414

15+
// FakeRangeWriter is a RangeWriter that is safe to use in goroutines. (mock.RangeWriter is
16+
// NOT safe, it uses gomock which calls t.Fatal and friends!)
17+
type FakeRangeWriter struct {
18+
err error
19+
20+
closed bool
21+
closeResult closeResult
22+
writeRecords []*committed.Record
23+
24+
storedType string
25+
}
26+
27+
type closeResult struct {
28+
result *committed.WriteResult
29+
err error
30+
}
31+
32+
var (
33+
ErrNotImplemented = errors.New("not implemented")
34+
ErrAlreadyClosed = errors.New("closed more than once")
35+
ErrUnexpected = errors.New("unexpected call")
36+
)
37+
38+
func (f *FakeRangeWriter) Err() error {
39+
return f.err
40+
}
41+
42+
func (f *FakeRangeWriter) setErr(err error) error {
43+
f.err = err
44+
return err
45+
}
46+
47+
func (f *FakeRangeWriter) ExpectWriteRecord(r committed.Record) {
48+
f.writeRecords = append(f.writeRecords, &r)
49+
}
50+
51+
func (f *FakeRangeWriter) ExpectAnyRecord() {
52+
f.writeRecords = append(f.writeRecords, nil)
53+
}
54+
55+
func (f *FakeRangeWriter) WriteRecord(r committed.Record) error {
56+
if len(f.writeRecords) < 1 {
57+
return f.setErr(fmt.Errorf("try to write %+v when expected nothing: %w", r, ErrUnexpected))
58+
}
59+
var n *committed.Record
60+
n, f.writeRecords = f.writeRecords[0], f.writeRecords[1:]
61+
if n == nil { // any
62+
return nil
63+
}
64+
if diffs := deep.Equal(&r, n); diffs != nil {
65+
return f.setErr(fmt.Errorf("try to write the wrong value %s: %w", diffs, ErrUnexpected))
66+
}
67+
return nil
68+
}
69+
70+
func (f *FakeRangeWriter) AddMetadata(key, value string) {
71+
if key == committed.MetadataTypeKey {
72+
f.storedType = value
73+
}
74+
}
75+
76+
func (*FakeRangeWriter) GetApproximateSize() uint64 { return 0 }
77+
78+
func (f *FakeRangeWriter) Close() (*committed.WriteResult, error) {
79+
if f.closed {
80+
f.err = ErrAlreadyClosed
81+
return nil, ErrAlreadyClosed
82+
}
83+
return f.closeResult.result, f.closeResult.err
84+
}
85+
86+
func (f *FakeRangeWriter) Abort() error { return nil }
87+
88+
func NewFakeRangeWriter(result *committed.WriteResult, err error) *FakeRangeWriter {
89+
return &FakeRangeWriter{
90+
closeResult: closeResult{result, err},
91+
}
92+
}
93+
1594
func TestBatchCloserSuccess(t *testing.T) {
1695
runSuccessScenario(t)
1796
}
1897

1998
func TestBatchWriterFailed(t *testing.T) {
20-
ctrl := gomock.NewController(t)
21-
defer ctrl.Finish()
22-
23-
writerSuccess := mock.NewMockRangeWriter(ctrl)
24-
writerSuccess.EXPECT().Close().Return(&committed.WriteResult{
25-
RangeID: committed.ID(strconv.Itoa(1)),
26-
First: committed.Key("row_1"),
27-
Last: committed.Key("row_2"),
28-
Count: 4321,
29-
}, nil).Times(1)
30-
writerFailure := mock.NewMockRangeWriter(ctrl)
99+
writerSuccess := NewFakeRangeWriter(
100+
&committed.WriteResult{
101+
RangeID: committed.ID(strconv.Itoa(1)),
102+
First: committed.Key("row_1"),
103+
Last: committed.Key("row_2"),
104+
Count: 4321,
105+
}, nil)
31106
expectedErr := errors.New("failure")
32-
writerFailure.EXPECT().Close().Return(nil, expectedErr).Times(1)
107+
writerFailure := NewFakeRangeWriter(nil, expectedErr)
33108

34-
sut := committed.NewBatchCloser()
35-
require.NoError(t, sut.CloseWriterAsync(writerSuccess))
36-
require.NoError(t, sut.CloseWriterAsync(writerFailure))
109+
sut := committed.NewBatchCloser(10)
110+
assert.NoError(t, sut.CloseWriterAsync(writerSuccess))
111+
assert.NoError(t, sut.CloseWriterAsync(writerFailure))
37112

38113
res, err := sut.Wait()
39-
require.Error(t, expectedErr, err)
40-
require.Nil(t, res)
114+
assert.Error(t, expectedErr, err)
115+
assert.Nil(t, res)
116+
117+
assert.NoError(t, writerSuccess.Err())
118+
assert.NoError(t, writerFailure.Err())
41119
}
42120

43121
func TestBatchCloserMultipleWaitCalls(t *testing.T) {
44-
sut, ctrl := runSuccessScenario(t)
45-
46-
writer := mock.NewMockRangeWriter(ctrl)
47-
writer.EXPECT().Close().Return(&committed.WriteResult{
122+
writer := NewFakeRangeWriter(&committed.WriteResult{
48123
RangeID: committed.ID("last"),
49124
First: committed.Key("row_1"),
50125
Last: committed.Key("row_2"),
51126
Count: 4321,
52-
}, nil).Times(1)
127+
}, nil)
53128

54-
require.Error(t, sut.CloseWriterAsync(writer), committed.ErrMultipleWaitCalls)
129+
sut := runSuccessScenario(t)
130+
131+
assert.Error(t, sut.CloseWriterAsync(writer), committed.ErrMultipleWaitCalls)
55132
res, err := sut.Wait()
56133
require.Nil(t, res)
57134
require.Error(t, err, committed.ErrMultipleWaitCalls)
58135
}
59136

60-
func runSuccessScenario(t *testing.T) (*committed.BatchCloser, *gomock.Controller) {
61-
ctrl := gomock.NewController(t)
62-
defer ctrl.Finish()
63-
137+
func runSuccessScenario(t *testing.T) *committed.BatchCloser {
64138
const writersCount = 10
65-
writers := make([]*mock.MockRangeWriter, writersCount)
139+
writers := make([]*FakeRangeWriter, writersCount)
66140
for i := 0; i < writersCount; i++ {
67-
writers[i] = mock.NewMockRangeWriter(ctrl)
68-
writers[i].EXPECT().Close().Return(&committed.WriteResult{
141+
writers[i] = NewFakeRangeWriter(&committed.WriteResult{
69142
RangeID: committed.ID(strconv.Itoa(i)),
70143
First: committed.Key(fmt.Sprintf("row_%d_1", i)),
71144
Last: committed.Key(fmt.Sprintf("row_%d_2", i)),
72145
Count: i,
73-
}, nil).Times(1)
146+
}, nil)
74147
}
75148

76-
sut := committed.NewBatchCloser()
149+
sut := committed.NewBatchCloser(writersCount)
77150

78151
for i := 0; i < writersCount; i++ {
79-
require.NoError(t, sut.CloseWriterAsync(writers[i]))
152+
assert.NoError(t, sut.CloseWriterAsync(writers[i]))
80153
}
81154

82155
res, err := sut.Wait()
83-
require.NoError(t, err)
84-
require.NotNil(t, res)
85-
require.Len(t, res, writersCount)
156+
assert.NoError(t, err)
157+
assert.NotNil(t, res)
158+
assert.Len(t, res, writersCount)
86159

87-
return sut, ctrl
160+
return sut
88161
}

0 commit comments

Comments
 (0)