Skip to content

Commit ca3ae64

Browse files
imsodinAudriusButkevicius
authored andcommitted
lib/db: Flush batch based on size and refactor (fixes syncthing#5531) (syncthing#5536)
Flush the batch when exceeding a certain size, instead of when reaching a number of batched operations. Move batch to lowlevel to be able to use it in NamespacedKV. Increase the leveldb memory buffer from 4 to 16 MiB.
1 parent e2204d0 commit ca3ae64

File tree

6 files changed

+55
-57
lines changed

6 files changed

+55
-57
lines changed

lib/db/blockmap_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func addToBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
7373
name := []byte(f.Name)
7474
for i, block := range f.Blocks {
7575
binary.BigEndian.PutUint32(blockBuf, uint32(i))
76-
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
76+
keyBuf = t.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
7777
t.Put(keyBuf, blockBuf)
7878
}
7979
}
@@ -89,7 +89,7 @@ func discardFromBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
8989
if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
9090
name := []byte(ef.Name)
9191
for _, block := range ef.Blocks {
92-
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
92+
keyBuf = t.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
9393
t.Delete(keyBuf)
9494
}
9595
}

lib/db/instance.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
8080
if ok {
8181
if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
8282
for _, block := range ef.Blocks {
83-
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
83+
keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
8484
t.Delete(keyBuf)
8585
}
8686
}
@@ -100,7 +100,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
100100
l.Debugf("insert (local); folder=%q %v", folder, f)
101101
t.Put(dk, mustMarshal(&f))
102102

103-
gk = t.db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
103+
gk = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
104104
keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
105105

106106
keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
@@ -110,7 +110,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
110110
if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
111111
for i, block := range f.Blocks {
112112
binary.BigEndian.PutUint32(blockBuf, uint32(i))
113-
keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
113+
keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
114114
t.Put(keyBuf, blockBuf)
115115
}
116116
}

lib/db/lowlevel.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import (
1919

2020
const (
2121
dbMaxOpenFiles = 100
22-
dbWriteBuffer = 4 << 20
22+
dbWriteBuffer = 16 << 20
23+
dbFlushBatch = dbWriteBuffer / 4 // Some leeway for any leveldb in-memory optimizations
2324
)
2425

2526
// Lowlevel is the lowest level database interface. It has a very simple
@@ -127,3 +128,29 @@ func leveldbIsCorrupted(err error) bool {
127128

128129
return false
129130
}
131+
132+
type batch struct {
133+
*leveldb.Batch
134+
db *Lowlevel
135+
}
136+
137+
func (db *Lowlevel) newBatch() *batch {
138+
return &batch{
139+
Batch: new(leveldb.Batch),
140+
db: db,
141+
}
142+
}
143+
144+
// checkFlush flushes and resets the batch if its size exceeds dbFlushBatch.
145+
func (b *batch) checkFlush() {
146+
if len(b.Dump()) > dbFlushBatch {
147+
b.flush()
148+
b.Reset()
149+
}
150+
}
151+
152+
func (b *batch) flush() {
153+
if err := b.db.Write(b.Batch, nil); err != nil {
154+
panic(err)
155+
}
156+
}

lib/db/namespaced.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"encoding/binary"
1111
"time"
1212

13-
"github.com/syndtr/goleveldb/leveldb"
1413
"github.com/syndtr/goleveldb/leveldb/util"
1514
)
1615

@@ -39,21 +38,12 @@ func NewNamespacedKV(db *Lowlevel, prefix string) *NamespacedKV {
3938
func (n *NamespacedKV) Reset() {
4039
it := n.db.NewIterator(util.BytesPrefix(n.prefix), nil)
4140
defer it.Release()
42-
batch := new(leveldb.Batch)
41+
batch := n.db.newBatch()
4342
for it.Next() {
4443
batch.Delete(it.Key())
45-
if batch.Len() > batchFlushSize {
46-
if err := n.db.Write(batch, nil); err != nil {
47-
panic(err)
48-
}
49-
batch.Reset()
50-
}
51-
}
52-
if batch.Len() > 0 {
53-
if err := n.db.Write(batch, nil); err != nil {
54-
panic(err)
55-
}
44+
batch.checkFlush()
5645
}
46+
batch.flush()
5747
}
5848

5949
// PutInt64 stores a new int64. Any existing value (even if of another type)

lib/db/schemaupdater.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,18 @@ func (db *schemaUpdater) updateSchema0to1() {
104104
var gk, buf []byte
105105

106106
for dbi.Next() {
107+
t.checkFlush()
108+
107109
folder, ok := db.keyer.FolderFromDeviceFileKey(dbi.Key())
108110
if !ok {
109111
// not having the folder in the index is bad; delete and continue
110112
t.Delete(dbi.Key())
111-
t.checkFlush()
112113
continue
113114
}
114115
device, ok := db.keyer.DeviceFromDeviceFileKey(dbi.Key())
115116
if !ok {
116117
// not having the device in the index is bad; delete and continue
117118
t.Delete(dbi.Key())
118-
t.checkFlush()
119119
continue
120120
}
121121
name := db.keyer.NameFromDeviceFileKey(dbi.Key())
@@ -128,7 +128,6 @@ func (db *schemaUpdater) updateSchema0to1() {
128128
gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
129129
buf = t.removeFromGlobal(gk, buf, folder, device, nil, nil)
130130
t.Delete(dbi.Key())
131-
t.checkFlush()
132131
continue
133132
}
134133

@@ -149,7 +148,6 @@ func (db *schemaUpdater) updateSchema0to1() {
149148
panic("can't happen: " + err.Error())
150149
}
151150
t.Put(dbi.Key(), bs)
152-
t.checkFlush()
153151
symlinkConv++
154152
}
155153

@@ -210,7 +208,7 @@ func (db *schemaUpdater) updateSchema2to3() {
210208
if !need(f, ok, v) {
211209
return true
212210
}
213-
nk = t.db.keyer.GenerateNeedFileKey(nk, folder, []byte(f.FileName()))
211+
nk = t.keyer.GenerateNeedFileKey(nk, folder, []byte(f.FileName()))
214212
t.Put(nk, nil)
215213
t.checkFlush()
216214
return true
@@ -282,7 +280,7 @@ func (db *schemaUpdater) updateSchema6to7() {
282280
svl, err := t.Get(gk, nil)
283281
if err != nil {
284282
// If there is no global list, we hardly need it.
285-
t.Delete(t.db.keyer.GenerateNeedFileKey(nk, folder, name))
283+
t.Delete(t.keyer.GenerateNeedFileKey(nk, folder, name))
286284
return true
287285
}
288286
var fl VersionList
@@ -293,7 +291,7 @@ func (db *schemaUpdater) updateSchema6to7() {
293291
return true
294292
}
295293
if localFV, haveLocalFV := fl.Get(protocol.LocalDeviceID[:]); !need(global, haveLocalFV, localFV.Version) {
296-
t.Delete(t.db.keyer.GenerateNeedFileKey(nk, folder, name))
294+
t.Delete(t.keyer.GenerateNeedFileKey(nk, folder, name))
297295
}
298296
return true
299297
})

lib/db/transactions.go

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@ import (
1212
"github.com/syndtr/goleveldb/leveldb/util"
1313
)
1414

15-
// Flush batches to disk when they contain this many records.
16-
const batchFlushSize = 64
17-
1815
// A readOnlyTransaction represents a database snapshot.
1916
type readOnlyTransaction struct {
2017
*leveldb.Snapshot
21-
db *instance
18+
keyer keyer
2219
}
2320

2421
func (db *instance) newReadOnlyTransaction() readOnlyTransaction {
@@ -28,7 +25,7 @@ func (db *instance) newReadOnlyTransaction() readOnlyTransaction {
2825
}
2926
return readOnlyTransaction{
3027
Snapshot: snap,
31-
db: db,
28+
keyer: db.keyer,
3229
}
3330
}
3431

@@ -37,7 +34,7 @@ func (t readOnlyTransaction) close() {
3734
}
3835

3936
func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
40-
return t.getFileByKey(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file))
37+
return t.getFileByKey(t.keyer.GenerateDeviceFileKey(nil, folder, device, file))
4138
}
4239

4340
func (t readOnlyTransaction) getFileByKey(key []byte) (protocol.FileInfo, bool) {
@@ -65,7 +62,7 @@ func (t readOnlyTransaction) getFileTrunc(key []byte, trunc bool) (FileIntf, boo
6562
}
6663

6764
func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate bool) ([]byte, FileIntf, bool) {
68-
keyBuf = t.db.keyer.GenerateGlobalVersionKey(keyBuf, folder, file)
65+
keyBuf = t.keyer.GenerateGlobalVersionKey(keyBuf, folder, file)
6966

7067
bs, err := t.Get(keyBuf, nil)
7168
if err != nil {
@@ -77,7 +74,7 @@ func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate boo
7774
return keyBuf, nil, false
7875
}
7976

80-
keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, vl.Versions[0].Device, file)
77+
keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, vl.Versions[0].Device, file)
8178
if fi, ok := t.getFileTrunc(keyBuf, truncate); ok {
8279
return keyBuf, fi, true
8380
}
@@ -90,14 +87,13 @@ func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate boo
9087
// batch size.
9188
type readWriteTransaction struct {
9289
readOnlyTransaction
93-
*leveldb.Batch
90+
*batch
9491
}
9592

9693
func (db *instance) newReadWriteTransaction() readWriteTransaction {
97-
t := db.newReadOnlyTransaction()
9894
return readWriteTransaction{
99-
readOnlyTransaction: t,
100-
Batch: new(leveldb.Batch),
95+
readOnlyTransaction: db.newReadOnlyTransaction(),
96+
batch: db.newBatch(),
10197
}
10298
}
10399

@@ -106,19 +102,6 @@ func (t readWriteTransaction) close() {
106102
t.readOnlyTransaction.close()
107103
}
108104

109-
func (t readWriteTransaction) checkFlush() {
110-
if t.Batch.Len() > batchFlushSize {
111-
t.flush()
112-
t.Batch.Reset()
113-
}
114-
}
115-
116-
func (t readWriteTransaction) flush() {
117-
if err := t.db.Write(t.Batch, nil); err != nil {
118-
panic(err)
119-
}
120-
}
121-
122105
// updateGlobal adds this device+version to the version list for the given
123106
// file. If the device is already present in the list, the version is updated.
124107
// If the file does not have an entry in the global list, it is created.
@@ -142,7 +125,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi
142125
// Inserted a new newest version
143126
global = file
144127
} else {
145-
keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, name)
128+
keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, name)
146129
if new, ok := t.getFileByKey(keyBuf); ok {
147130
global = new
148131
} else {
@@ -167,7 +150,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi
167150
// The previous newest version is now at index 1
168151
oldGlobalFV = fl.Versions[1]
169152
}
170-
keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, oldGlobalFV.Device, name)
153+
keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, oldGlobalFV.Device, name)
171154
if oldFile, ok := t.getFileByKey(keyBuf); ok {
172155
// A failure to get the file here is surprising and our
173156
// global size data will be incorrect until a restart...
@@ -187,7 +170,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi
187170
// device according to the version list and global FileInfo given and updates
188171
// the db accordingly.
189172
func (t readWriteTransaction) updateLocalNeed(keyBuf, folder, name []byte, fl VersionList, global protocol.FileInfo) []byte {
190-
keyBuf = t.db.keyer.GenerateNeedFileKey(keyBuf, folder, name)
173+
keyBuf = t.keyer.GenerateNeedFileKey(keyBuf, folder, name)
191174
hasNeeded, _ := t.Has(keyBuf, nil)
192175
if localFV, haveLocalFV := fl.Get(protocol.LocalDeviceID[:]); need(global, haveLocalFV, localFV.Version) {
193176
if !hasNeeded {
@@ -246,21 +229,21 @@ func (t readWriteTransaction) removeFromGlobal(gk, keyBuf, folder, device []byte
246229
if removedAt == 0 {
247230
// A failure to get the file here is surprising and our
248231
// global size data will be incorrect until a restart...
249-
keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, device, file)
232+
keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, device, file)
250233
if f, ok := t.getFileByKey(keyBuf); ok {
251234
meta.removeFile(protocol.GlobalDeviceID, f)
252235
}
253236
}
254237

255238
if len(fl.Versions) == 0 {
256-
keyBuf = t.db.keyer.GenerateNeedFileKey(keyBuf, folder, file)
239+
keyBuf = t.keyer.GenerateNeedFileKey(keyBuf, folder, file)
257240
t.Delete(keyBuf)
258241
t.Delete(gk)
259242
return keyBuf
260243
}
261244

262245
if removedAt == 0 {
263-
keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, file)
246+
keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, file)
264247
global, ok := t.getFileByKey(keyBuf)
265248
if !ok {
266249
panic("This file must exist in the db")

0 commit comments

Comments
 (0)