Skip to content

Commit bd1f903

Browse files
authored
Fix race while update branch returned from ref manager branch update (treeverse#9360)
* Fix race while update branch returned from ref manager branch update Fetching branch information using batch executor will return a shared result. This result is then used to update the branch information which can lead to race condition. This commit fixes the race by cloning the branch information before returning it. * test race condition
1 parent c959ff3 commit bd1f903

File tree

3 files changed

+82
-1
lines changed

3 files changed

+82
-1
lines changed

pkg/graveler/graveler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"slices"
89
"strconv"
910
"strings"
1011
"time"
@@ -503,6 +504,18 @@ type Branch struct {
503504
Hidden bool
504505
}
505506

507+
// Clone creates a deep copy of the Branch
508+
func (b *Branch) Clone() *Branch {
509+
clone := &Branch{
510+
CommitID: b.CommitID,
511+
StagingToken: b.StagingToken,
512+
SealedTokens: slices.Clone(b.SealedTokens),
513+
CompactedBaseMetaRangeID: b.CompactedBaseMetaRangeID,
514+
Hidden: b.Hidden,
515+
}
516+
return clone
517+
}
518+
506519
// BranchRecord holds BranchID with the associated Branch data
507520
type BranchRecord struct {
508521
BranchID BranchID `db:"id"`

pkg/graveler/ref/manager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,6 @@ func (m *Manager) BranchUpdate(ctx context.Context, repository *graveler.Reposit
456456
release, err := m.branchOwnership.Own(ctx, requestID, string(branchID))
457457
if err != nil {
458458
logging.FromContext(ctx).
459-
WithFields(logging.Fields{}).
460459
WithError(err).
461460
Warn("Failed to get ownership on branch; continuing but may be slow")
462461
} else {
@@ -467,6 +466,10 @@ func (m *Manager) BranchUpdate(ctx context.Context, repository *graveler.Reposit
467466
if err != nil {
468467
return err
469468
}
469+
470+
// clone the branch information to avoid mutating the shared result returned by batch executor
471+
b = b.Clone()
472+
470473
newBranch, err := f(b)
471474
// return on error or nothing to update
472475
if err != nil || newBranch == nil {

pkg/graveler/ref/manager_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sort"
1111
"strconv"
1212
"strings"
13+
"sync"
1314
"testing"
1415
"time"
1516

@@ -23,8 +24,11 @@ import (
2324
"github.com/treeverse/lakefs/pkg/graveler/ref"
2425
"github.com/treeverse/lakefs/pkg/ident"
2526
"github.com/treeverse/lakefs/pkg/kv"
27+
"github.com/treeverse/lakefs/pkg/kv/kvtest"
2628
"github.com/treeverse/lakefs/pkg/kv/mock"
29+
"github.com/treeverse/lakefs/pkg/logging"
2730
"github.com/treeverse/lakefs/pkg/testutil"
31+
"go.uber.org/ratelimit"
2832
"google.golang.org/protobuf/proto"
2933
"google.golang.org/protobuf/types/known/timestamppb"
3034
)
@@ -547,6 +551,67 @@ func TestManager_BranchUpdate(t *testing.T) {
547551
}
548552
}
549553

554+
func TestManager_BranchUpdateRaceCondition(t *testing.T) {
555+
ctx := context.Background()
556+
kvStore := kvtest.GetStore(ctx, t)
557+
executor := batch.NewExecutor(logging.Dummy())
558+
cfg := ref.ManagerConfig{
559+
Executor: executor,
560+
KVStore: kvStore,
561+
KVStoreLimited: kv.NewStoreLimiter(kvStore, ratelimit.NewUnlimited()),
562+
AddressProvider: ident.NewHexAddressProvider(),
563+
RepositoryCacheConfig: testRepoCacheConfig,
564+
CommitCacheConfig: testCommitCacheConfig,
565+
MaxBatchDelay: 10 * time.Millisecond,
566+
}
567+
cancelCtx, cancel := context.WithCancel(ctx)
568+
defer cancel()
569+
go executor.Run(cancelCtx)
570+
r := ref.NewRefManager(cfg, NewStorageConfigMock(config.SingleBlockstoreID))
571+
572+
const (
573+
repoID = "repo1"
574+
branchID = "race-branch"
575+
)
576+
repository, err := r.CreateRepository(context.Background(), repoID, graveler.Repository{
577+
StorageID: "sid",
578+
StorageNamespace: "s3://test-bucket",
579+
CreationDate: time.Now(),
580+
DefaultBranchID: "main",
581+
})
582+
testutil.Must(t, err)
583+
584+
// Set initial branch state
585+
testutil.Must(t, r.SetBranch(ctx, repository, branchID, graveler.Branch{
586+
CommitID: "test-commit-id",
587+
StagingToken: "staging1",
588+
}))
589+
590+
// Use goroutines to create a proper race condition
591+
var wg sync.WaitGroup
592+
593+
// Start goroutines that will modify the branch concurrently
594+
const concurrentGoroutines = 5
595+
wg.Add(concurrentGoroutines)
596+
for i := range concurrentGoroutines {
597+
go func() {
598+
defer wg.Done()
599+
600+
_ = r.BranchUpdate(ctx, repository, branchID, func(branch *graveler.Branch) (*graveler.Branch, error) {
601+
// Modify the branch information
602+
branch.StagingToken = graveler.StagingToken("staging" + strconv.Itoa(i))
603+
branch.SealedTokens = slices.Repeat([]graveler.StagingToken{branch.StagingToken}, i)
604+
return branch, nil
605+
})
606+
}()
607+
}
608+
wg.Wait()
609+
610+
branch, err := r.GetBranch(ctx, repository, branchID)
611+
require.NoError(t, err)
612+
require.NotNil(t, branch)
613+
}
614+
550615
func TestManager_DeleteBranch(t *testing.T) {
551616
r, _ := testRefManager(t)
552617
ctx := context.Background()

0 commit comments

Comments
 (0)