Skip to content

Commit 0d10f13

Browse files
authored
Migrate MVCC database information into new entry catalog format (treeverse#1229)
1 parent 53e4c0a commit 0d10f13

22 files changed

+1052
-139
lines changed

api/api_controller.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@ import (
1010
"strings"
1111
"time"
1212

13-
"github.com/treeverse/lakefs/graveler"
14-
"github.com/treeverse/lakefs/parade"
15-
16-
"github.com/treeverse/lakefs/export"
17-
1813
"github.com/aws/aws-sdk-go/aws"
1914
"github.com/go-openapi/runtime"
2015
"github.com/go-openapi/runtime/middleware"
@@ -40,8 +35,11 @@ import (
4035
"github.com/treeverse/lakefs/catalog"
4136
"github.com/treeverse/lakefs/db"
4237
"github.com/treeverse/lakefs/dedup"
38+
"github.com/treeverse/lakefs/export"
39+
"github.com/treeverse/lakefs/graveler"
4340
"github.com/treeverse/lakefs/httputil"
4441
"github.com/treeverse/lakefs/logging"
42+
"github.com/treeverse/lakefs/parade"
4543
"github.com/treeverse/lakefs/permissions"
4644
"github.com/treeverse/lakefs/retention"
4745
"github.com/treeverse/lakefs/stats"

catalog/migrate/iterator.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package migrate
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/treeverse/lakefs/catalog"
8+
"github.com/treeverse/lakefs/catalog/mvcc"
9+
"github.com/treeverse/lakefs/catalog/rocks"
10+
"github.com/treeverse/lakefs/db"
11+
)
12+
13+
type iteratorState int
14+
15+
const (
16+
iteratorStateInit iteratorState = iota
17+
iteratorStateQuerying
18+
iteratorStateDone
19+
)
20+
21+
type Iterator struct {
22+
state iteratorState
23+
ctx context.Context
24+
db db.Database
25+
branchID int64
26+
commitID int64
27+
buf []*rocks.EntryRecord
28+
err error
29+
offset string
30+
fetchSize int
31+
value *rocks.EntryRecord
32+
}
33+
34+
var ErrIteratorClosed = errors.New("iterator closed")
35+
36+
// NewIterator returns an iterator over an mvcc branch/commit, giving entries as EntryCatalog.
37+
func NewIterator(ctx context.Context, db db.Database, branchID int64, commitID int64, fetchSize int) *Iterator {
38+
return &Iterator{
39+
ctx: ctx,
40+
db: db,
41+
branchID: branchID,
42+
commitID: commitID,
43+
buf: make([]*rocks.EntryRecord, 0, fetchSize),
44+
fetchSize: fetchSize,
45+
}
46+
}
47+
48+
func (it *Iterator) Next() bool {
49+
if it.err != nil {
50+
return false
51+
}
52+
53+
it.maybeFetch()
54+
55+
// stage a value and increment offset
56+
if len(it.buf) == 0 {
57+
return false
58+
}
59+
it.value = it.buf[0]
60+
it.buf = it.buf[1:]
61+
it.offset = string(it.value.Path)
62+
return true
63+
}
64+
65+
func (it *Iterator) SeekGE(id rocks.Path) {
66+
if errors.Is(it.err, ErrIteratorClosed) {
67+
return
68+
}
69+
it.state = iteratorStateInit
70+
it.offset = id.String()
71+
it.buf = it.buf[:0]
72+
it.value = nil
73+
it.err = nil
74+
}
75+
76+
func (it *Iterator) Value() *rocks.EntryRecord {
77+
if it.err != nil {
78+
return nil
79+
}
80+
return it.value
81+
}
82+
83+
func (it *Iterator) Err() error {
84+
return it.err
85+
}
86+
87+
func (it *Iterator) Close() {
88+
it.buf = nil
89+
it.state = iteratorStateDone
90+
it.err = ErrIteratorClosed
91+
}
92+
93+
func (it *Iterator) maybeFetch() {
94+
if it.state == iteratorStateDone {
95+
return
96+
}
97+
if len(it.buf) > 0 {
98+
return
99+
}
100+
if it.state == iteratorStateInit {
101+
it.state = iteratorStateQuerying
102+
}
103+
104+
var res interface{}
105+
res, it.err = it.db.Transact(func(tx db.Tx) (interface{}, error) {
106+
return mvcc.ListEntriesTx(tx, it.branchID, mvcc.CommitID(it.commitID), "", it.offset, it.fetchSize)
107+
}, db.WithContext(it.ctx), db.ReadOnly())
108+
if it.err != nil {
109+
return
110+
}
111+
entries := res.([]*catalog.Entry)
112+
for _, entry := range entries {
113+
rec := &rocks.EntryRecord{
114+
Path: rocks.Path(entry.Path),
115+
Entry: rocks.EntryFromCatalogEntry(*entry),
116+
}
117+
it.buf = append(it.buf, rec)
118+
}
119+
if len(it.buf) < it.fetchSize {
120+
it.state = iteratorStateDone
121+
} else if len(it.buf) > 0 {
122+
it.offset = it.buf[len(it.buf)-1].Path.String()
123+
}
124+
}
125+
126+
type emptyIterator struct{}
127+
128+
func (e *emptyIterator) Next() bool {
129+
return false
130+
}
131+
132+
func (e *emptyIterator) SeekGE(rocks.Path) {
133+
}
134+
135+
func (e *emptyIterator) Value() *rocks.EntryRecord {
136+
return nil
137+
}
138+
139+
func (e *emptyIterator) Err() error {
140+
return nil
141+
}
142+
143+
func (e *emptyIterator) Close() {
144+
}
145+
146+
func newEmptyIterator() rocks.EntryIterator {
147+
return &emptyIterator{}
148+
}

catalog/migrate/main_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package migrate
2+
3+
import (
4+
"flag"
5+
"log"
6+
"os"
7+
"testing"
8+
9+
"github.com/ory/dockertest/v3"
10+
"github.com/sirupsen/logrus"
11+
"github.com/treeverse/lakefs/testutil"
12+
)
13+
14+
var (
15+
pool *dockertest.Pool
16+
databaseURI string
17+
)
18+
19+
func TestMain(m *testing.M) {
20+
flag.Parse()
21+
if !testing.Verbose() {
22+
// keep the log level calm
23+
logrus.SetLevel(logrus.PanicLevel)
24+
}
25+
26+
// postgres container
27+
var err error
28+
pool, err = dockertest.NewPool("")
29+
if err != nil {
30+
log.Fatalf("Could not connect to Docker: %s", err)
31+
}
32+
var closer func()
33+
databaseURI, closer = testutil.GetDBInstance(pool)
34+
code := m.Run()
35+
closer() // cleanup
36+
os.Exit(code)
37+
}

0 commit comments

Comments
 (0)