Skip to content

Commit

Permalink
test: add test with Memberlist as a KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
NickAnge committed Nov 25, 2024
1 parent c44e5dd commit 1bd6fab
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ package distributor
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
Expand All @@ -33,6 +37,33 @@ import (
utiltest "github.com/grafana/mimir/pkg/util/test"
)

var addrsOnce sync.Once
var localhostIP string

func getLocalhostAddrs() []string {
addrsOnce.Do(func() {
ip, err := net.ResolveIPAddr("ip4", "localhost")
if err != nil {
localhostIP = "127.0.0.1" // this is the most common answer, try it
}
localhostIP = ip.String()
})
return []string{localhostIP}
}

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(_ context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTracker, user, cluster, replica string, expected time.Time, elected time.Time) {
t.Helper()

Expand Down Expand Up @@ -70,6 +101,82 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra
})
}

func TestHaTrackerWithMemberList(t *testing.T) {
var config memberlist.KVConfig

const cluster = "cluster"
const replica1 = "r1"
const replica2 = "r2"

flagext.DefaultValues(&config)
ctx := context.Background()

config.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: getLocalhostAddrs(),
BindPort: 0,
}

config.Codecs = []codec.Codec{
GetReplicaDescCodec(),
}

memberListSvc := memberlist.NewKVInitService(
&config,
log.NewNopLogger(),
&dnsProviderMock{},
prometheus.NewPedanticRegistry(),
)

t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc))
})

c, err := newHaTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{
MemberlistKV: memberListSvc.GetMemberlistKV,
}},
UpdateTimeout: time.Millisecond * 100,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(ctx, c))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

now := time.Now()

// Write the first time.
err = c.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.NoError(t, err)

// Throw away a sample from replica2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Wait more than the overwrite timeout.
now = now.Add(1100 * time.Millisecond)

// Another sample from replica2 to update its timestamp.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Update KVStore - this should elect replica 2.
c.updateKVStoreAll(context.Background(), now)

checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica2, now, now)

// Now we should accept from replica 2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.NoError(t, err)

// We timed out accepting samples from replica 1 and should now reject them.
err = c.checkReplica(context.Background(), "user", "test", replica1, now)
assert.Error(t, err)
}

func TestHATrackerCacheSyncOnStart(t *testing.T) {
const cluster = "c1"
const replicaOne = "r1"
Expand Down

0 comments on commit 1bd6fab

Please sign in to comment.