Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ha_tracker): Replicadesc implement mergeable interface #10020

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests: improve testcases for TestReplicaDescMerge
  • Loading branch information
NickAnge committed Nov 28, 2024
commit 2ceeadc8c9dcb16ebd9540b074022255be21c72f
51 changes: 22 additions & 29 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func NewReplicaDesc() *ReplicaDesc {
return &ReplicaDesc{}
}

// Merge merges other ReplicaDesc into this one.
// The decision is made based on the ReceivedAt timestamp, if the Replica name is the same and at the ElectedAt if the
// Replica name is different
func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) {
return r.mergeWithTime(other)
}
Expand All @@ -79,61 +82,51 @@ func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist.
return nil, nil
}

thisRDesc := r
otherRDesc := other

if otherRDesc.Replica == thisRDesc.Replica {
if other.Replica == r.Replica {
// Keeping the one with the most recent receivedAt timestamp
if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt {
*thisRDesc = *other
} else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 {
*thisRDesc = *other
if other.ReceivedAt > r.ReceivedAt {
*r = *other
} else if r.ReceivedAt == other.ReceivedAt && r.DeletedAt == 0 && other.DeletedAt != 0 {
*r = *other
}
} else {
// keep the most recent Elected to reach consistency
if otherRDesc.ElectedAt > thisRDesc.ElectedAt {
*thisRDesc = *other
} else if otherRDesc.ElectedAt == thisRDesc.ElectedAt {
if other.ElectedAt > r.ElectedAt {
*r = *other
} else if other.ElectedAt == r.ElectedAt {
// if the timestamps are equal we compare receivedAt
if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt {
*thisRDesc = *other
if other.ReceivedAt > r.ReceivedAt {
*r = *other
}
}
}

// No changes - return
if *thisRDesc != *otherRDesc {
// No changes
if *r != *other {
return nil, nil
}

out := NewReplicaDesc()
*out = *thisRDesc
*out = *r
return out, nil
}

// MergeContent noOp currently
// MergeContent describes content of this Mergeable.
// Given that ReplicaDesc can have only one instance at a time, it returns the ReplicaDesc it contains.
func (r *ReplicaDesc) MergeContent() []string {
result := []string(nil)
if len(r.Replica) != 0 {
result = append(result, r.Replica)
result = append(result, r.String())
}
return result
}

// RemoveTombstones noOp for now
func (r *ReplicaDesc) RemoveTombstones(limit time.Time) (total, removed int) {
if r.DeletedAt > 0 {
if limit.IsZero() || time.Unix(r.DeletedAt, 0).Before(limit) {
// need to implement the remove logic
removed = 1
} else {
total = 1
}
}
// RemoveTombstones noOp.
func (r *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) {
return
}

// Clone returns a deep copy of the ring state.
// Clone returns a deep copy of the ReplicaDesc.
func (r *ReplicaDesc) Clone() memberlist.Mergeable {
return proto.Clone(r).(*ReplicaDesc)
}
Expand Down
58 changes: 39 additions & 19 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,36 +173,56 @@ func TestReplicaDescMerge(t *testing.T) {
}
}

tests := []struct {
expectedFirstAndSecondMerge := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica2,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 5,
ElectedChanges: 2,
}
}

testsMerge := []struct {
name string
rDesc1 *ReplicaDesc
rDesc2 *ReplicaDesc
expectedOurs *ReplicaDesc
expectedRDesc *ReplicaDesc
expectedChange *ReplicaDesc
}{
{
name: "simple merge: firstReplica and firstReplicaWithHigherReceivedAt",
name: "Merge ReplicaDesc: Same replica name, different receivedAt should return ReplicaDesc with most recent receivedAt timestamp",
rDesc1: firstReplica(),
rDesc2: firstReplicaWithHigherReceivedAt(),
expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(),
expectedRDesc: expectedFirstAndFirstHigherReceivedAtMerge(),
expectedChange: expectedFirstAndFirstHigherReceivedAtMerge(),
},
{
name: "idempotency: no change after applying same Replica again",
rDesc1: expectedFirstAndFirstHigherReceivedAtMerge(),
name: "Merge ReplicaDesc: Different replica name, different electedAt should return ReplicaDesc with most recent electedAt timestamp",
rDesc1: firstReplica(),
rDesc2: secondReplica(),
expectedRDesc: expectedFirstAndSecondMerge(),
expectedChange: expectedFirstAndSecondMerge(),
},
{
name: "idempotency: no change after applying same ReplicaDesc again.",
rDesc1: func() *ReplicaDesc {
out, _ := merge(firstReplica(), secondReplica())
return out
}(),
rDesc2: firstReplica(),
expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(),
expectedRDesc: expectedFirstAndSecondMerge(),
expectedChange: nil,
},
{
name: "commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt)",
rDesc1: firstReplicaWithHigherReceivedAt(),
rDesc2: firstReplica(),
expectedOurs: func() *ReplicaDesc {
expected, _ := merge(firstReplica(), firstReplicaWithHigherReceivedAt())
name: "commutativity: Merge(first, second) == Merge(second, first)",
rDesc1: firstReplica(),
rDesc2: secondReplica(),
expectedRDesc: func() *ReplicaDesc {
expected, _ := merge(secondReplica(), firstReplica())
return expected
}(),
expectedChange: nil,
expectedChange: expectedFirstAndSecondMerge(),
},
{
name: "associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))",
Expand All @@ -212,19 +232,19 @@ func TestReplicaDescMerge(t *testing.T) {
return ours1
}(),
rDesc2: nil,
expectedOurs: func() *ReplicaDesc {
expectedRDesc: func() *ReplicaDesc {
ours2, _ := merge(secondReplica(), thirdReplica())
ours2, _ = merge(ours2, firstReplica())
ours2, _ = merge(firstReplica(), ours2)
return ours2
}(),
expectedChange: nil,
},
}

for _, tt := range tests {
for _, tt := range testsMerge {
t.Run(tt.name, func(t *testing.T) {
ours, ch := merge(tt.rDesc1, tt.rDesc2)
assert.Equal(t, tt.expectedOurs, ours)
rDesc, ch := merge(tt.rDesc1, tt.rDesc2)
assert.Equal(t, tt.expectedRDesc, rDesc)
assert.Equal(t, tt.expectedChange, ch)
})
}
Expand Down Expand Up @@ -291,7 +311,7 @@ func TestHaTrackerWithMemberList(t *testing.T) {
assert.Error(t, err)

// Wait more than the overwrite timeout.
now = now.Add(time.Millisecond * failoverTimeoutPlus100ms)
now = now.Add(failoverTimeoutPlus100ms)

// Another sample from replica2 to update its timestamp.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
Expand Down