@@ -10,6 +10,7 @@ import (
10
10
"sort"
11
11
"strconv"
12
12
"strings"
13
+ "sync"
13
14
"testing"
14
15
"time"
15
16
@@ -23,8 +24,11 @@ import (
23
24
"github.com/treeverse/lakefs/pkg/graveler/ref"
24
25
"github.com/treeverse/lakefs/pkg/ident"
25
26
"github.com/treeverse/lakefs/pkg/kv"
27
+ "github.com/treeverse/lakefs/pkg/kv/kvtest"
26
28
"github.com/treeverse/lakefs/pkg/kv/mock"
29
+ "github.com/treeverse/lakefs/pkg/logging"
27
30
"github.com/treeverse/lakefs/pkg/testutil"
31
+ "go.uber.org/ratelimit"
28
32
"google.golang.org/protobuf/proto"
29
33
"google.golang.org/protobuf/types/known/timestamppb"
30
34
)
@@ -547,6 +551,67 @@ func TestManager_BranchUpdate(t *testing.T) {
547
551
}
548
552
}
549
553
554
+ func TestManager_BranchUpdateRaceCondition (t * testing.T ) {
555
+ ctx := context .Background ()
556
+ kvStore := kvtest .GetStore (ctx , t )
557
+ executor := batch .NewExecutor (logging .Dummy ())
558
+ cfg := ref.ManagerConfig {
559
+ Executor : executor ,
560
+ KVStore : kvStore ,
561
+ KVStoreLimited : kv .NewStoreLimiter (kvStore , ratelimit .NewUnlimited ()),
562
+ AddressProvider : ident .NewHexAddressProvider (),
563
+ RepositoryCacheConfig : testRepoCacheConfig ,
564
+ CommitCacheConfig : testCommitCacheConfig ,
565
+ MaxBatchDelay : 10 * time .Millisecond ,
566
+ }
567
+ cancelCtx , cancel := context .WithCancel (ctx )
568
+ defer cancel ()
569
+ go executor .Run (cancelCtx )
570
+ r := ref .NewRefManager (cfg , NewStorageConfigMock (config .SingleBlockstoreID ))
571
+
572
+ const (
573
+ repoID = "repo1"
574
+ branchID = "race-branch"
575
+ )
576
+ repository , err := r .CreateRepository (context .Background (), repoID , graveler.Repository {
577
+ StorageID : "sid" ,
578
+ StorageNamespace : "s3://test-bucket" ,
579
+ CreationDate : time .Now (),
580
+ DefaultBranchID : "main" ,
581
+ })
582
+ testutil .Must (t , err )
583
+
584
+ // Set initial branch state
585
+ testutil .Must (t , r .SetBranch (ctx , repository , branchID , graveler.Branch {
586
+ CommitID : "test-commit-id" ,
587
+ StagingToken : "staging1" ,
588
+ }))
589
+
590
+ // Use goroutines to create a proper race condition
591
+ var wg sync.WaitGroup
592
+
593
+ // Start goroutines that will modify the branch concurrently
594
+ const concurrentGoroutines = 5
595
+ wg .Add (concurrentGoroutines )
596
+ for i := range concurrentGoroutines {
597
+ go func () {
598
+ defer wg .Done ()
599
+
600
+ _ = r .BranchUpdate (ctx , repository , branchID , func (branch * graveler.Branch ) (* graveler.Branch , error ) {
601
+ // Modify the branch information
602
+ branch .StagingToken = graveler .StagingToken ("staging" + strconv .Itoa (i ))
603
+ branch .SealedTokens = slices .Repeat ([]graveler.StagingToken {branch .StagingToken }, i )
604
+ return branch , nil
605
+ })
606
+ }()
607
+ }
608
+ wg .Wait ()
609
+
610
+ branch , err := r .GetBranch (ctx , repository , branchID )
611
+ require .NoError (t , err )
612
+ require .NotNil (t , branch )
613
+ }
614
+
550
615
func TestManager_DeleteBranch (t * testing.T ) {
551
616
r , _ := testRefManager (t )
552
617
ctx := context .Background ()
0 commit comments