Skip to content

Commit 05abd0c

Browse files
committed
More work on shuffle sharding utils
Changes following up on PR #807810 . Made the validation checking function return a slice of error messages rather than just a bit. Replaced all the `int32` with `int` because this is intended for more than just the priority-and-faireness feature and so should not be a slave to its configuration datatypes. Introduced ShuffleAndDealIntoHand, to make memory allocation the caller's problem/privilege. Made the hand uniformity tester avoid reflection, evaluate the histogram against the expected range of counts, and run multiple test cases, including one in which the number of hash values is a power of two with four extra bits (as the validation check requires) and one in which the deck size is not a power of two. Updated the fairqueuing implementation to use the shuffle sharding utility package.
1 parent 639be91 commit 05abd0c

File tree

5 files changed

+165
-101
lines changed

5 files changed

+165
-101
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
visibility = ["//visibility:public"],
1717
deps = [
1818
"//staging/src/k8s.io/apiserver/pkg/util/clock:go_default_library",
19+
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
1920
"//vendor/k8s.io/klog:go_default_library",
2021
],
2122
)

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/fairqueuing.go

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"k8s.io/apiserver/pkg/util/clock"
25+
"k8s.io/apiserver/pkg/util/shufflesharding"
2526
"k8s.io/klog"
2627
)
2728

@@ -222,30 +223,6 @@ func (qs *queueSetImpl) GetRequestsExecuting() int {
222223
return total
223224
}
224225

225-
func shuffleDealAndPick(v, nq uint64,
226-
lengthOfQueue func(int) int,
227-
mr func(int /*in [0, nq-1]*/) int, /*in [0, numQueues-1] and excluding previously determined members of I*/
228-
nRem, minLen, bestIdx int) int {
229-
if nRem < 1 {
230-
return bestIdx
231-
}
232-
vNext := v / nq
233-
ai := int(v - nq*vNext)
234-
ii := mr(ai)
235-
mrNext := func(a int /*in [0, nq-2]*/) int /*in [0, numQueues-1] and excluding I[0], I[1], ... ii*/ {
236-
if a < ai {
237-
return mr(a)
238-
}
239-
return mr(a + 1)
240-
}
241-
lenI := lengthOfQueue(ii)
242-
if lenI < minLen {
243-
minLen = lenI
244-
bestIdx = ii
245-
}
246-
return shuffleDealAndPick(vNext, nq-1, lengthOfQueue, mrNext, nRem-1, minLen, bestIdx)
247-
}
248-
249226
// ChooseQueueIdx uses shuffle sharding to select an queue index
250227
// using a 'hashValue'. The 'hashValue' derives a hand from a set range of
251228
// indexes (range 'desiredNumQueues') and returns the queue with the least queued packets
@@ -254,10 +231,16 @@ func (qs *queueSetImpl) ChooseQueueIdx(hashValue uint64, handSize int) int {
254231
// TODO(aaron-prindle) currently a lock is held for this in a larger anonymous function
255232
// verify that makes sense...
256233

234+
bestQueueIdx := -1
235+
bestQueueLen := int(math.MaxInt32)
257236
// desiredNumQueues is used here instead of numQueues to omit quiesce queues
258-
return shuffleDealAndPick(hashValue, uint64(qs.desiredNumQueues),
259-
func(idx int) int { return len(qs.queues[idx].Requests) },
260-
func(i int) int { return i }, handSize, math.MaxInt32, -1)
237+
shufflesharding.ShuffleAndDeal(hashValue, qs.desiredNumQueues, handSize, func(queueIdx int) {
238+
thisLen := len(qs.queues[queueIdx].Requests)
239+
if thisLen < bestQueueLen {
240+
bestQueueIdx, bestQueueLen = queueIdx, thisLen
241+
}
242+
})
243+
return bestQueueIdx
261244
}
262245

263246
// rejectOrEnqueue rejects or enqueues the newly arrived request if

staging/src/k8s.io/apiserver/pkg/util/shufflesharding/shufflesharding.go

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,51 @@ package shufflesharding
1818

1919
import (
2020
"errors"
21+
"fmt"
2122
"math"
23+
"strings"
2224
)
2325

2426
const maxHashBits = 60
2527

26-
// ValidateParameters can validate parameters for shuffle sharding
27-
// in a fast but approximate way, including deckSize and handSize
28-
// Algorithm: maxHashBits >= bits(deckSize^handSize)
29-
func ValidateParameters(deckSize, handSize int32) bool {
30-
if handSize <= 0 || deckSize <= 0 || handSize > deckSize {
31-
return false
28+
// ValidateParameters finds errors in the parameters for shuffle
29+
// sharding. Returns a slice for which `len()` is 0 if and only if
30+
// there are no errors. The entropy requirement is evaluated in a
31+
// fast but approximate way: bits(deckSize^handSize).
32+
func ValidateParameters(deckSize, handSize int) (errs []string) {
33+
if handSize <= 0 {
34+
errs = append(errs, "handSize is not positive")
3235
}
33-
34-
return math.Log2(float64(deckSize))*float64(handSize) <= maxHashBits
36+
if deckSize <= 0 {
37+
errs = append(errs, "deckSize is not positive")
38+
}
39+
if len(errs) > 0 {
40+
return
41+
}
42+
if handSize > deckSize {
43+
return []string{"handSize is greater than deckSize"}
44+
}
45+
if math.Log2(float64(deckSize))*float64(handSize) > maxHashBits {
46+
return []string{fmt.Sprintf("more than %d bits of entropy required", maxHashBits)}
47+
}
48+
return
3549
}
3650

3751
// ShuffleAndDeal can shuffle a hash value to handSize-quantity and non-redundant
3852
// indices of decks, with the pick function, we can get the optimal deck index
3953
// Eg. From deckSize=128, handSize=8, we can get an index array [12 14 73 18 119 51 117 26],
4054
// then pick function will choose the optimal index from these
4155
// Algorithm: https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#queue-assignment-proof-of-concept
42-
func ShuffleAndDeal(hashValue uint64, deckSize, handSize int32, pick func(int32)) {
43-
remainders := make([]int32, handSize)
56+
func ShuffleAndDeal(hashValue uint64, deckSize, handSize int, pick func(int)) {
57+
remainders := make([]int, handSize)
4458

45-
for i := int32(0); i < handSize; i++ {
59+
for i := 0; i < handSize; i++ {
4660
hashValueNext := hashValue / uint64(deckSize-i)
47-
remainders[i] = int32(hashValue - uint64(deckSize-i)*hashValueNext)
61+
remainders[i] = int(hashValue - uint64(deckSize-i)*hashValueNext)
4862
hashValue = hashValueNext
4963
}
5064

51-
for i := int32(0); i < handSize; i++ {
65+
for i := 0; i < handSize; i++ {
5266
candidate := remainders[i]
5367
for j := i; j > 0; j-- {
5468
if candidate >= remainders[j-1] {
@@ -60,9 +74,9 @@ func ShuffleAndDeal(hashValue uint64, deckSize, handSize int32, pick func(int32)
6074
}
6175

6276
// ShuffleAndDealWithValidation will do validation before ShuffleAndDeal
63-
func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int32, pick func(int32)) error {
64-
if !ValidateParameters(deckSize, handSize) {
65-
return errors.New("bad parameters")
77+
func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int, pick func(int)) error {
78+
if errs := ValidateParameters(deckSize, handSize); len(errs) > 0 {
79+
return errors.New(strings.Join(errs, ";"))
6680
}
6781

6882
ShuffleAndDeal(hashValue, deckSize, handSize, pick)
@@ -71,14 +85,14 @@ func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int32, pi
7185

7286
// ShuffleAndDealToSlice will use specific pick function to return slices of indices
7387
// after ShuffleAndDeal
74-
func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int32) []int32 {
88+
func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int) []int {
7589
var (
76-
candidates = make([]int32, handSize)
90+
candidates = make([]int, handSize)
7791
idx = 0
7892
)
7993

80-
pickToSlices := func(can int32) {
81-
candidates[idx] = can
94+
pickToSlices := func(can int) {
95+
candidates[idx] = int(can)
8296
idx++
8397
}
8498

@@ -87,11 +101,33 @@ func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int32) []int32 {
87101
return candidates
88102
}
89103

104+
// ShuffleAndDealIntoHand shuffles a deck of the given size by the
105+
// given hash value and deals cards into the given slice. The virtue
106+
// of this function compared to ShuffleAndDealToSlice is that the
107+
// caller provides the storage for the hand.
108+
func ShuffleAndDealIntoHand(hashValue uint64, deckSize int, hand []int) {
109+
handSize := len(hand)
110+
var idx int
111+
ShuffleAndDeal(hashValue, deckSize, handSize, func(card int) {
112+
hand[idx] = int(card)
113+
idx++
114+
})
115+
}
116+
90117
// ShuffleAndDealToSliceWithValidation will do validation before ShuffleAndDealToSlice
91-
func ShuffleAndDealToSliceWithValidation(hashValue uint64, deckSize, handSize int32) ([]int32, error) {
92-
if !ValidateParameters(deckSize, handSize) {
93-
return nil, errors.New("bad parameters")
118+
func ShuffleAndDealToSliceWithValidation(hashValue uint64, deckSize, handSize int) ([]int, error) {
119+
if errs := ValidateParameters(deckSize, handSize); len(errs) > 0 {
120+
return nil, errors.New(strings.Join(errs, ";"))
94121
}
95122

96123
return ShuffleAndDealToSlice(hashValue, deckSize, handSize), nil
97124
}
125+
126+
// ShuffleAndDealIntoHandWithValidation does validation and then ShuffleAndDealIntoHand
127+
func ShuffleAndDealIntoHandWithValidation(hashValue uint64, deckSize int, hand []int) error {
128+
if errs := ValidateParameters(deckSize, len(hand)); len(errs) > 0 {
129+
return errors.New(strings.Join(errs, ";"))
130+
}
131+
ShuffleAndDealIntoHand(hashValue, deckSize, hand)
132+
return nil
133+
}

0 commit comments

Comments
 (0)