Skip to content

Commit 53c6871

Browse files
authored
Bug/read wrong commits (treeverse#395)
Former-commit-id: a3aad38ab2e4930d3e802eadd1ffd080fcb677dc
1 parent 3a404f0 commit 53c6871

File tree

3 files changed

+57
-20
lines changed

3 files changed

+57
-20
lines changed

catalog/cataloger_list_entries.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ type readPramsType struct {
113113
tx db.Tx
114114
prefix string
115115
branchBatchSize int
116-
lineage []lineageCommit
117116
lowestCommitID, topCommitID CommitID
118117
branchID int64
118+
branchQueryMap map[int64]sq.SelectBuilder
119119
}
120120

121121
func loopByLevel(tx db.Tx, prefix, after, delimiter string, limit, branchBatchSize int, branchID int64, requestedCommit CommitID, lineage []lineageCommit) ([]string, error) {
@@ -136,7 +136,7 @@ func loopByLevel(tx db.Tx, prefix, after, delimiter string, limit, branchBatchSi
136136
branchPriorityMap[l.BranchID] = i + 1
137137
}
138138
limit += 1 // increase limit to get indication of more rows to come
139-
unionQueryParts := buildBaseLevelQuery(branchID, lineage, branchBatchSize, lowestCommitID, topCommitID, len(prefix))
139+
unionQueryParts, branchQueryMap := buildBaseLevelQuery(branchID, lineage, branchBatchSize, lowestCommitID, topCommitID, len(prefix))
140140
endOfPrefixRange := prefix + DirectoryTermination
141141

142142
var exactFirst bool
@@ -156,10 +156,10 @@ func loopByLevel(tx db.Tx, prefix, after, delimiter string, limit, branchBatchSi
156156
tx: tx,
157157
prefix: prefix,
158158
branchBatchSize: branchBatchSize,
159-
lineage: lineage,
160159
lowestCommitID: lowestCommitID,
161160
topCommitID: topCommitID,
162161
branchID: branchID,
162+
branchQueryMap: branchQueryMap,
163163
}
164164

165165
for {
@@ -264,30 +264,18 @@ func getBranchResultRowsForPath(path string, branch int64, branchRanges map[int6
264264
}
265265

266266
func getMoreRows(path string, branch int64, branchRanges map[int64][]resultRow, readParams readPramsType) error {
267-
var topCommitID CommitID
268-
if branch == readParams.branchID { // it is the base branch
269-
topCommitID = readParams.topCommitID
270-
} else {
271-
for _, l := range readParams.lineage {
272-
if branch == l.BranchID {
273-
topCommitID = l.CommitID
274-
break
275-
}
276-
}
277-
}
278267
// have to re-read the last entry, because otherwise the expression becomes complex and the optimizer gets NUTS
279268
//so read size must be the batch size + whatever results were left from the last entry
280269
// If readBuf is not extended - there will be an endless loop if number of results is bigger than batch size.
281270
requiredBufferSize := readParams.branchBatchSize + len(branchRanges[branch])
282271
readBuf := make([]resultRow, 0, requiredBufferSize)
283-
singleSelect := selectSingleBranch(branch, branch == readParams.branchID, requiredBufferSize, readParams.lowestCommitID, topCommitID, len(readParams.prefix))
272+
singleSelect := readParams.branchQueryMap[branch]
284273
requestedPath := readParams.prefix + path
285-
singleSelect = singleSelect.Where("path >= ? and path < ?", requestedPath, readParams.prefix+DirectoryTermination)
274+
singleSelect = singleSelect.Where("path >= ? and path < ?", requestedPath, readParams.prefix+DirectoryTermination).Limit(uint64(requiredBufferSize))
286275
s, args, err := singleSelect.PlaceholderFormat(sq.Dollar).ToSql()
287276
if err != nil {
288277
return err
289278
}
290-
291279
err = readParams.tx.Select(&readBuf, s, args...)
292280
if len(branchRanges[branch]) == len(readBuf) {
293281
err = sql.ErrNoRows
@@ -333,13 +321,17 @@ func checkPathNotDeleted(pathResults []resultRow) bool {
333321
}
334322
}
335323

336-
func buildBaseLevelQuery(baseBranchID int64, lineage []lineageCommit, branchEntryLimit int, lowestCommitID, topCommitID CommitID, prefixLen int) []sq.SelectBuilder {
324+
func buildBaseLevelQuery(baseBranchID int64, lineage []lineageCommit, branchEntryLimit int,
325+
lowestCommitID, topCommitID CommitID, prefixLen int) ([]sq.SelectBuilder, map[int64]sq.SelectBuilder) {
337326
unionParts := make([]sq.SelectBuilder, len(lineage)+1)
327+
unionMap := make(map[int64]sq.SelectBuilder)
338328
unionParts[0] = selectSingleBranch(baseBranchID, true, branchEntryLimit, lowestCommitID, topCommitID, prefixLen)
329+
unionMap[baseBranchID] = unionParts[0]
339330
for i, l := range lineage {
340331
unionParts[i+1] = selectSingleBranch(l.BranchID, false, branchEntryLimit, 1, l.CommitID, prefixLen)
332+
unionMap[l.BranchID] = unionParts[i+1]
341333
}
342-
return unionParts
334+
return unionParts, unionMap
343335
}
344336

345337
func selectSingleBranch(branchID int64, isBaseBranch bool, branchBatchSize int, lowestCommitID, topCommitID CommitID, prefixLen int) sq.SelectBuilder {

catalog/cataloger_list_entries_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,3 +647,48 @@ func TestCataloger_ListEntries_Uncommitted(t *testing.T) {
647647
t.Fatal("ListEntries", diff)
648648
}
649649
}
650+
651+
func TestCataloger_ListEntries_ReadingUncommittedFromLineage(t *testing.T) {
652+
ctx := context.Background()
653+
c := testCataloger(t)
654+
repo := testCatalogerRepo(t, ctx, c, "repo", "master")
655+
for i := 0; i < 10; i++ {
656+
z := fmt.Sprintf("%03d", i)
657+
path := "my_entry" + z
658+
testCatalogerCreateEntry(t, ctx, c, repo, "master", path, nil, "abcd"+z)
659+
}
660+
_, err := c.Commit(ctx, repo, "master", "commit first 10 in master", "tester", nil)
661+
testutil.MustDo(t, "commit first 10 in master", err)
662+
663+
// deletion that will not be seen by br_1, because it is not committed for br_1. so br_1 still sees this
664+
testutil.MustDo(t, "delete the first committed file",
665+
c.DeleteEntry(ctx, repo, "master", "my_entry001"))
666+
testCatalogerBranch(t, ctx, c, repo, "br_1", "master")
667+
for i := 10; i < 20; i++ {
668+
z := fmt.Sprintf("%03d", i)
669+
path := "my_entry/sub-" + z
670+
testCatalogerCreateEntry(t, ctx, c, repo, "br_1", path, nil, "abcd"+z)
671+
}
672+
673+
// create unreadable in ancestor
674+
// committed
675+
for i := 20; i < 50; i++ {
676+
z := fmt.Sprintf("%03d", i)
677+
path := "my_entry" + z
678+
testCatalogerCreateEntry(t, ctx, c, repo, "master", path, nil, "abcd"+z)
679+
}
680+
_, err = c.Commit(ctx, repo, "master", "commit 20-50 in master", "tester", nil)
681+
testutil.MustDo(t, "commit 20-50 in master", err)
682+
683+
// uncommitted
684+
for i := 50; i < 70; i++ {
685+
z := fmt.Sprintf("%03d", i)
686+
path := "my_entry" + z
687+
testCatalogerCreateEntry(t, ctx, c, repo, "master", path, nil, "abcd"+z)
688+
}
689+
got, _, err := c.ListEntries(ctx, repo, "br_1", "", "", DefaultPathDelimiter, -1)
690+
testutil.Must(t, err)
691+
if len(got) != 11 {
692+
t.Fatalf("expected 10 entries, read %d", len(got))
693+
}
694+
}

catalog/views.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func sqDiffFromSonV(fatherID, sonID int64, fatherEffectiveCommit, sonEffectiveCo
126126
(l.commit_id >= f.min_commit AND
127127
(l.commit_id > f.max_commit OR NOT f.is_deleted))
128128
)))
129-
AS DifferenceTypeConflict `, fatherID, fatherEffectiveCommit, fatherEffectiveCommit, fatherID).
129+
AS DifferenceTypeConflict`, fatherID, fatherEffectiveCommit, fatherEffectiveCommit, fatherID).
130130
FromSelect(sqEntriesV(CommittedID).Distinct().
131131
Options(" on (branch_id,path)").
132132
OrderBy("branch_id", "path", "min_commit desc").

0 commit comments

Comments
 (0)