Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ha_tracker): Replicadesc implement mergeable interface #10020

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -60,6 +61,90 @@ func NewReplicaDesc() *ReplicaDesc {
return &ReplicaDesc{}
}

func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) {
return r.mergeWithTime(other)
}

func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}

other, ok := mergeable.(*ReplicaDesc)
if !ok {
return nil, fmt.Errorf("expected *distributor.ReplicaDesc, got %T", mergeable)
}

if other == nil {
return nil, nil
}

thisRDesc := r
otherRDesc := other

// Track changes
changed := false

if otherRDesc.Replica == thisRDesc.Replica {
// Keeping the one with the most recent receivedAt timestamp
if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt {
*r = *other
changed = true
} else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 {
*r = *other
changed = true
}
} else {
// keep the most recent Elected to reach consistency
if otherRDesc.ElectedAt > thisRDesc.ElectedAt {
*r = *other
changed = true
} else if otherRDesc.ElectedAt == thisRDesc.ElectedAt {
// if the timestamps are equal we compare receivedAt
if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt {
*r = *other
changed = true
}
}
}

// No changes - return
if !changed {
return nil, nil
}

out := NewReplicaDesc()
*out = *thisRDesc
return out, nil
}

// MergeContent noOp currently
func (r *ReplicaDesc) MergeContent() []string {
result := []string(nil)
if len(r.Replica) != 0 {
result = append(result, r.Replica)
}
return result
}

// RemoveTombstones noOp for now
func (r *ReplicaDesc) RemoveTombstones(limit time.Time) (total, removed int) {
if r.DeletedAt > 0 {
if limit.IsZero() || time.Unix(r.DeletedAt, 0).Before(limit) {
// need to implement the remove logic
removed = 1
} else {
total = 1
}
}
return
}

// Clone returns a deep copy of the ring state.
func (r *ReplicaDesc) Clone() memberlist.Mergeable {
return proto.Clone(r).(*ReplicaDesc)
}

// HATrackerConfig contains the configuration required to
// create an HA Tracker.
type HATrackerConfig struct {
Expand Down
221 changes: 221 additions & 0 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ package distributor
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
Expand All @@ -33,6 +37,33 @@ import (
utiltest "github.com/grafana/mimir/pkg/util/test"
)

var addrsOnce sync.Once
var localhostIP string

func getLocalhostAddrs() []string {
addrsOnce.Do(func() {
ip, err := net.ResolveIPAddr("ip4", "localhost")
if err != nil {
localhostIP = "127.0.0.1" // this is the most common answer, try it
}
localhostIP = ip.String()
})
return []string{localhostIP}
}

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(_ context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTracker, user, cluster, replica string, expected time.Time, elected time.Time) {
t.Helper()

Expand Down Expand Up @@ -70,6 +101,196 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra
})
}

func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) {
change, err := r1.Merge(r2, false)
if err != nil {
panic(err)
}

if change == nil {
return r1, nil
}

changeRDesc := change.(*ReplicaDesc)
return r1, changeRDesc
}

func TestReplicaDescMerge(t *testing.T) {
now := time.Now().Unix()

const (
replica1 = "r1"
replica2 = "r2"
replica3 = "r3"
)

firstReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

firstReplicaWithHigherReceivedAt := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now + 5,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

secondReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica2,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 5,
ElectedChanges: 2,
}
}

thirdReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica3,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 10,
ElectedChanges: 3,
}
}

expectedFirstAndFirstHigherReceivedAtMerge := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now + 5,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

expectedFirstSecondThirdAtMerge := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica3,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 10,
ElectedChanges: 3,
}
}

{
ours, ch := merge(firstReplica(), firstReplicaWithHigherReceivedAt())
assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours)
assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ch)
}

{ // idempotency: (no change after applying same Replica again)
ours, ch := merge(expectedFirstAndFirstHigherReceivedAtMerge(), firstReplica())
assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours)
assert.Equal(t, (*ReplicaDesc)(nil), ch)
}

{ // commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt)
our, ch := merge(firstReplicaWithHigherReceivedAt(), firstReplica())
assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), our)
// change is nil in this case, since the incoming ReplicaDesc has lower receivedAt timestamp
assert.Equal(t, (*ReplicaDesc)(nil), ch)
}

{ // associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))
ours1, _ := merge(firstReplica(), secondReplica())
ours1, _ = merge(ours1, thirdReplica())
assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours1)

ours2, _ := merge(secondReplica(), thirdReplica())
ours2, _ = merge(ours2, firstReplica())
assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours2)
}

}

func TestHaTrackerWithMemberList(t *testing.T) {
var config memberlist.KVConfig

const cluster = "cluster"
const replica1 = "r1"
const replica2 = "r2"

flagext.DefaultValues(&config)
ctx := context.Background()

config.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: getLocalhostAddrs(),
BindPort: 0,
}

config.Codecs = []codec.Codec{
GetReplicaDescCodec(),
}

memberListSvc := memberlist.NewKVInitService(
&config,
log.NewNopLogger(),
&dnsProviderMock{},
prometheus.NewPedanticRegistry(),
)

t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc))
})

c, err := newHaTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{
MemberlistKV: memberListSvc.GetMemberlistKV,
}},
UpdateTimeout: time.Millisecond * 100,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(ctx, c))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

now := time.Now()

// Write the first time.
err = c.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.NoError(t, err)

// Throw away a sample from replica2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Wait more than the overwrite timeout.
now = now.Add(1100 * time.Millisecond)

// Another sample from replica2 to update its timestamp.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Update KVStore - this should elect replica 2.
c.updateKVStoreAll(context.Background(), now)

checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica2, now, now)

// Now we should accept from replica 2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.NoError(t, err)

// We timed out accepting samples from replica 1 and should now reject them.
err = c.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.Error(t, err)
}

func TestHATrackerCacheSyncOnStart(t *testing.T) {
const cluster = "c1"
const replicaOne = "r1"
Expand Down
Loading