Skip to content

Commit 50d714c

Browse files
authored
identify: rate limit id push protocol (libp2p#3266)
The rate limits id pushes from peers to one every five second with an allowed burst of 10 pushes. This should be enough for all but malfunctioning and malicious peers. We can use the exact same code for autonat, autonatv2, circuit v2, etc. Introducing limits to identify separately to get some feedback for libp2p#3265. For this PR, I'd like to ignore issues regarding where should this piece of code go, and focus on how specifically it should behave. See the long comment in rateLimiter.allow for example. Part of: libp2p#3265
1 parent effdc65 commit 50d714c

File tree

7 files changed

+611
-6
lines changed

7 files changed

+611
-6
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ require (
6363
golang.org/x/crypto v0.35.0
6464
golang.org/x/sync v0.11.0
6565
golang.org/x/sys v0.30.0
66+
golang.org/x/time v0.11.0
6667
golang.org/x/tools v0.30.0
6768
google.golang.org/protobuf v1.36.5
6869
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,8 @@ golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
429429
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
430430
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
431431
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
432-
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
433-
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
432+
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
433+
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
434434
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
435435
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
436436
golang.org/x/tools v0.0.0-20181030000716-a0a13e073c7b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

p2p/internal/rate/limiter.go

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Package rate provides rate limiting functionality at a global, network, and subnet level.
2+
package rate
3+
4+
import (
5+
"container/heap"
6+
"net/netip"
7+
"slices"
8+
"sync"
9+
"time"
10+
11+
"github.com/libp2p/go-libp2p/core/network"
12+
ma "github.com/multiformats/go-multiaddr"
13+
manet "github.com/multiformats/go-multiaddr/net"
14+
"golang.org/x/time/rate"
15+
)
16+
17+
// Limit is the configuration for a token bucket rate limiter.
18+
// The bucket has a capacity of Burst, and is refilled at a rate of RPS tokens per second.
19+
// Initially, buckets are completley full, i.e. tokens in the bucket is equal to `Burst`.
20+
// In any given time interval T seconds, maximum events allowed will be `T*RPS + Burst`.
21+
type Limit struct {
22+
// RPS is the rate of requests per second in steady state.
23+
RPS float64
24+
// Burst is the number of requests allowed over the RPS.
25+
Burst int
26+
}
27+
28+
// PrefixLimit is a rate limit configuration that applies to a specific network prefix.
29+
type PrefixLimit struct {
30+
Prefix netip.Prefix
31+
Limit
32+
}
33+
34+
// SubnetLimit is a rate limit configuration that applies to a specific subnet.
35+
type SubnetLimit struct {
36+
PrefixLength int
37+
Limit
38+
}
39+
40+
// Limiter rate limits new streams for a service. It allows setting NetworkPrefix specific,
41+
// global, and subnet specific limits. Use 0 for no rate limiting.
42+
// The limiter maintains state that must be periodically cleaned up using Cleanup
43+
type Limiter struct {
44+
// NetworkPrefixLimits are limits for streams with peer IPs belonging to specific subnets.
45+
// It can be used to increase the limit for trusted networks and decrease the limit for specific networks.
46+
NetworkPrefixLimits []PrefixLimit
47+
// GlobalLimit is the limit for all streams where the peer IP doesn't fall within any
48+
// of the `NetworkPrefixLimits`
49+
GlobalLimit Limit
50+
// SubnetRateLimiter is a rate limiter for subnets.
51+
SubnetRateLimiter SubnetLimiter
52+
53+
initOnce sync.Once
54+
globalBucket *rate.Limiter
55+
networkPrefixBuckets []*rate.Limiter // ith element ratelimits ith NetworkPrefixLimits
56+
}
57+
58+
func (r *Limiter) init() {
59+
r.initOnce.Do(func() {
60+
if r.GlobalLimit.RPS == 0 {
61+
r.globalBucket = rate.NewLimiter(rate.Inf, 0)
62+
} else {
63+
r.globalBucket = rate.NewLimiter(rate.Limit(r.GlobalLimit.RPS), r.GlobalLimit.Burst)
64+
}
65+
// sort such that the widest prefix (smallest bit count) is last.
66+
slices.SortFunc(r.NetworkPrefixLimits, func(a, b PrefixLimit) int { return b.Prefix.Bits() - a.Prefix.Bits() })
67+
r.networkPrefixBuckets = make([]*rate.Limiter, 0, len(r.NetworkPrefixLimits))
68+
for _, limit := range r.NetworkPrefixLimits {
69+
if limit.RPS == 0 {
70+
r.networkPrefixBuckets = append(r.networkPrefixBuckets, rate.NewLimiter(rate.Inf, 0))
71+
} else {
72+
r.networkPrefixBuckets = append(r.networkPrefixBuckets, rate.NewLimiter(rate.Limit(limit.RPS), limit.Burst))
73+
}
74+
}
75+
})
76+
}
77+
78+
// Limit rate limits a StreamHandler function.
79+
func (r *Limiter) Limit(f func(s network.Stream)) func(s network.Stream) {
80+
r.init()
81+
return func(s network.Stream) {
82+
if !r.allow(s.Conn().RemoteMultiaddr()) {
83+
_ = s.ResetWithError(network.StreamRateLimited)
84+
return
85+
}
86+
f(s)
87+
}
88+
}
89+
90+
func (r *Limiter) allow(addr ma.Multiaddr) bool {
91+
r.init()
92+
// Check buckets from the most specific to the least.
93+
//
94+
// This ensures that a single peer cannot take up all the tokens in the global
95+
// rate limiting bucket. We *MUST* follow this order because the rate limiter
96+
// implementation doesn't have a `ReturnToken` method. If we checked the global
97+
// bucket before the specific bucket, and the specific bucket rejected the
98+
// request, there's no way to return the token to the global bucket. So all
99+
// rejected requests from the specific bucket would take up tokens from the global bucket.
100+
ip, err := manet.ToIP(addr)
101+
if err != nil {
102+
return r.globalBucket.Allow()
103+
}
104+
ipAddr, ok := netip.AddrFromSlice(ip)
105+
if !ok {
106+
return r.globalBucket.Allow()
107+
}
108+
109+
// prefixs have been sorted from most to least specific so rejected requests for more
110+
// specific prefixes don't take up tokens from the less specific prefixes.
111+
isWithinNetworkPrefix := false
112+
for i, limit := range r.NetworkPrefixLimits {
113+
if limit.Prefix.Contains(ipAddr) {
114+
if !r.networkPrefixBuckets[i].Allow() {
115+
return false
116+
}
117+
isWithinNetworkPrefix = true
118+
}
119+
}
120+
if isWithinNetworkPrefix {
121+
return true
122+
}
123+
124+
if !r.SubnetRateLimiter.Allow(ipAddr, time.Now()) {
125+
return false
126+
}
127+
return r.globalBucket.Allow()
128+
}
129+
130+
// SubnetLimiter rate limits requests per ip subnet.
131+
type SubnetLimiter struct {
132+
// IPv4SubnetLimits are the per subnet limits for streams with IPv4 Peers.
133+
IPv4SubnetLimits []SubnetLimit
134+
// IPv6SubnetLimits are the per subnet limits for streams with IPv6 Peers.
135+
IPv6SubnetLimits []SubnetLimit
136+
// GracePeriod is the time to wait to remove a full capacity bucket.
137+
// Keeping a bucket around helps prevent allocations
138+
GracePeriod time.Duration
139+
140+
initOnce sync.Once
141+
mx sync.Mutex
142+
ipv4Heaps []*bucketHeap
143+
ipv6Heaps []*bucketHeap
144+
}
145+
146+
func (s *SubnetLimiter) init() {
147+
s.initOnce.Do(func() {
148+
// smaller prefix length, i.e. largest subnet, last
149+
slices.SortFunc(s.IPv4SubnetLimits, func(a, b SubnetLimit) int { return b.PrefixLength - a.PrefixLength })
150+
slices.SortFunc(s.IPv6SubnetLimits, func(a, b SubnetLimit) int { return b.PrefixLength - a.PrefixLength })
151+
152+
s.ipv4Heaps = make([]*bucketHeap, len(s.IPv4SubnetLimits))
153+
for i := range s.IPv4SubnetLimits {
154+
s.ipv4Heaps[i] = &bucketHeap{
155+
prefixBucket: make([]prefixBucketWithExpiry, 0),
156+
prefixToIndex: make(map[netip.Prefix]int),
157+
}
158+
heap.Init(s.ipv4Heaps[i])
159+
}
160+
161+
s.ipv6Heaps = make([]*bucketHeap, len(s.IPv6SubnetLimits))
162+
for i := range s.IPv6SubnetLimits {
163+
s.ipv6Heaps[i] = &bucketHeap{
164+
prefixBucket: make([]prefixBucketWithExpiry, 0),
165+
prefixToIndex: make(map[netip.Prefix]int),
166+
}
167+
heap.Init(s.ipv6Heaps[i])
168+
}
169+
})
170+
}
171+
172+
// Allow returns true if requests for `ipAddr` are within specified rate limits
173+
func (s *SubnetLimiter) Allow(ipAddr netip.Addr, now time.Time) bool {
174+
s.init()
175+
s.mx.Lock()
176+
defer s.mx.Unlock()
177+
178+
s.cleanUp(now)
179+
180+
var subNetLimits []SubnetLimit
181+
var heaps []*bucketHeap
182+
if ipAddr.Is4() {
183+
subNetLimits = s.IPv4SubnetLimits
184+
heaps = s.ipv4Heaps
185+
} else {
186+
subNetLimits = s.IPv6SubnetLimits
187+
heaps = s.ipv6Heaps
188+
}
189+
190+
for i, limit := range subNetLimits {
191+
prefix, err := ipAddr.Prefix(limit.PrefixLength)
192+
if err != nil {
193+
return false // we have a ipaddr this shouldn't happen
194+
}
195+
196+
bucket := heaps[i].Get(prefix)
197+
if bucket == (prefixBucketWithExpiry{}) {
198+
bucket = prefixBucketWithExpiry{
199+
Prefix: prefix,
200+
tokenBucket: tokenBucket{rate.NewLimiter(rate.Limit(limit.RPS), limit.Burst)},
201+
Expiry: now,
202+
}
203+
}
204+
205+
if !bucket.Allow() {
206+
// bucket is empty, its expiry would have been set correctly the last time
207+
// it allowed a request.
208+
return false
209+
}
210+
bucket.Expiry = bucket.FullAt(now).Add(s.GracePeriod)
211+
heaps[i].Upsert(bucket)
212+
}
213+
return true
214+
}
215+
216+
// cleanUp removes limiters that have expired by now.
217+
func (s *SubnetLimiter) cleanUp(now time.Time) {
218+
for _, h := range s.ipv4Heaps {
219+
h.Expire(now)
220+
}
221+
for _, h := range s.ipv6Heaps {
222+
h.Expire(now)
223+
}
224+
}
225+
226+
// tokenBucket is a *rate.Limiter with a `FullAt` method.
227+
type tokenBucket struct {
228+
*rate.Limiter
229+
}
230+
231+
// FullAt returns the instant at which the bucket will be full.
232+
func (b *tokenBucket) FullAt(now time.Time) time.Time {
233+
tokensNeeded := float64(b.Burst()) - b.TokensAt(now)
234+
refillRate := float64(b.Limit())
235+
eta := time.Duration((tokensNeeded / refillRate) * float64(time.Second))
236+
return now.Add(eta)
237+
}
238+
239+
// prefixBucketWithExpiry is a token bucket with a prefix and Expiry. The expiry is when the bucket
240+
// will be full with tokens.
241+
type prefixBucketWithExpiry struct {
242+
tokenBucket
243+
Prefix netip.Prefix
244+
Expiry time.Time
245+
}
246+
247+
// bucketHeap is a heap of buckets ordered by their Expiry. At expiry, the bucket
248+
// is removed from the heap as a full bucket is indistinguishable from a new bucket.
249+
type bucketHeap struct {
250+
prefixBucket []prefixBucketWithExpiry
251+
prefixToIndex map[netip.Prefix]int
252+
}
253+
254+
var _ heap.Interface = (*bucketHeap)(nil)
255+
256+
// Upsert replaces the bucket with prefix `b.Prefix` with the provided bucket, `b`, or
257+
// inserts `b` if no bucket with prefix `b.Prefix` exists.
258+
func (h *bucketHeap) Upsert(b prefixBucketWithExpiry) {
259+
if i, ok := h.prefixToIndex[b.Prefix]; ok {
260+
h.prefixBucket[i] = b
261+
heap.Fix(h, i)
262+
return
263+
}
264+
heap.Push(h, b)
265+
}
266+
267+
// Get returns the limiter for a prefix
268+
func (h *bucketHeap) Get(prefix netip.Prefix) prefixBucketWithExpiry {
269+
if i, ok := h.prefixToIndex[prefix]; ok {
270+
return h.prefixBucket[i]
271+
}
272+
return prefixBucketWithExpiry{}
273+
}
274+
275+
// Expire removes elements with expiry before `expiry`
276+
func (h *bucketHeap) Expire(expiry time.Time) {
277+
for h.Len() > 0 {
278+
oldest := h.prefixBucket[0]
279+
if oldest.Expiry.After(expiry) {
280+
break
281+
}
282+
heap.Pop(h)
283+
}
284+
}
285+
286+
// Methods for the heap interface
287+
288+
// Len returns the length of the heap
289+
func (h *bucketHeap) Len() int {
290+
return len(h.prefixBucket)
291+
}
292+
293+
// Less compares two elements in the heap
294+
func (h *bucketHeap) Less(i, j int) bool {
295+
return h.prefixBucket[i].Expiry.Before(h.prefixBucket[j].Expiry)
296+
}
297+
298+
// Swap swaps two elements in the heap
299+
func (h *bucketHeap) Swap(i, j int) {
300+
h.prefixBucket[i], h.prefixBucket[j] = h.prefixBucket[j], h.prefixBucket[i]
301+
h.prefixToIndex[h.prefixBucket[i].Prefix] = i
302+
h.prefixToIndex[h.prefixBucket[j].Prefix] = j
303+
}
304+
305+
// Push adds a new element to the heap
306+
func (h *bucketHeap) Push(x any) {
307+
item := x.(prefixBucketWithExpiry)
308+
h.prefixBucket = append(h.prefixBucket, item)
309+
h.prefixToIndex[item.Prefix] = len(h.prefixBucket) - 1
310+
}
311+
312+
// Pop removes and returns the top element from the heap
313+
func (h *bucketHeap) Pop() any {
314+
n := len(h.prefixBucket)
315+
item := h.prefixBucket[n-1]
316+
h.prefixBucket = h.prefixBucket[0 : n-1]
317+
delete(h.prefixToIndex, item.Prefix)
318+
return item
319+
}

0 commit comments

Comments
 (0)