Skip to content

Commit ddae951

Browse files
committed
RateLimit, for rate-limiting elements iterated from an Iterable
1 parent ef3edfd commit ddae951

File tree

8 files changed

+396
-0
lines changed

8 files changed

+396
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
1818
- `Fn0`, a function from `Unit` to some value
1919
- `Fn1#thunk`, producing an `Fn0`
2020
- `Absent`, a monoid over `Maybe` that is absence biased
21+
- `RateLimit`, a function that iterates elements from an `Iterable` according to some rate limit
2122

2223
### Changed
2324
- `Tuple2-8` now implement `Product2-8`
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.jnape.palatable.lambda.functions.builtin.fn4;
2+
3+
import com.jnape.palatable.lambda.functions.Fn1;
4+
import com.jnape.palatable.lambda.functions.Fn2;
5+
import com.jnape.palatable.lambda.functions.Fn3;
6+
import com.jnape.palatable.lambda.functions.Fn4;
7+
import com.jnape.palatable.lambda.iteration.IterationInterruptedException;
8+
import com.jnape.palatable.lambda.iteration.RateLimitingIterable;
9+
10+
import java.time.Duration;
11+
import java.time.Instant;
12+
import java.util.function.Supplier;
13+
14+
import static com.jnape.palatable.lambda.adt.hlist.HList.tuple;
15+
import static java.util.Collections.singleton;
16+
17+
/**
18+
* Given a {@link Supplier} of {@link Instant Instants} (presumably backed by a clock), a <code>limit</code>, a {@link
19+
* Duration}, and an {@link Iterable} <code>as</code>, return an {@link Iterable} that iterates <code>as</code>
20+
* according to the threshold specified by the limit per duration, using the {@link Supplier} to advance time.
21+
* <p>
22+
* As an example, the following will print at most 10 elements per second:
23+
* <pre><code>
24+
* rateLimit(Clock.systemUTC()::instant, 10L, Duration.ofSeconds(1), iterate(x -> x + 1, 1)).forEach(System.out::println);
25+
* </code></pre>
26+
* Currying allows different rate limits to be combined naturally:
27+
* <pre><code>
28+
* Iterable<Integer> elements = iterate(x -> x + 1, 1);
29+
*
30+
* Supplier<Instant> instantSupplier = Clock.systemUTC()::instant;
31+
* Fn1<Iterable<Integer>, Iterable<Integer>> tenPerSecond = rateLimit(instantSupplier, 10L, Duration.ofSeconds(1));
32+
* Fn1<Iterable<Integer>, Iterable<Integer>> oneHundredEveryTwoMinutes = rateLimit(instantSupplier, 100L, Duration.ofMinutes(2));
33+
*
34+
* tenPerSecond.fmap(oneHundredEveryTwoMinutes).apply(elements).forEach(System.out::println);
35+
* </code></pre>
36+
* In the preceding example, the elements will be printed at most 10 elements per second and 100 elements per 120
37+
* seconds.
38+
* <p>
39+
* If the host {@link Thread} is {@link Thread#interrupt() interrupted} while the returned {@link Iterable} is waiting
40+
* for the next available time slice, an {@link IterationInterruptedException} will immediately be thrown.
41+
* <p>
42+
* Note that the returned {@link Iterable} will never iterate faster than the specified rate limit, but the earliest
43+
* the next element is available will be dependent on the precision of the underlying instant supplier as well as any
44+
* overhead involved in producing the element from the original {@link Iterable}.
45+
*
46+
* @param <A> the {@link Iterable} element type
47+
*/
48+
public final class RateLimit<A> implements Fn4<Supplier<Instant>, Long, Duration, Iterable<A>, Iterable<A>> {
49+
50+
private static final RateLimit INSTANCE = new RateLimit();
51+
52+
private RateLimit() {
53+
}
54+
55+
@Override
56+
public Iterable<A> apply(Supplier<Instant> instantSupplier, Long limit, Duration duration, Iterable<A> as) {
57+
if (limit < 1)
58+
throw new IllegalArgumentException("Limit must be greater than 0: " + limit);
59+
60+
return new RateLimitingIterable<>(as, singleton(tuple(limit, duration, instantSupplier)));
61+
}
62+
63+
@SuppressWarnings("unchecked")
64+
public static <A> RateLimit<A> rateLimit() {
65+
return INSTANCE;
66+
}
67+
68+
public static <A> Fn3<Long, Duration, Iterable<A>, Iterable<A>> rateLimit(Supplier<Instant> instantSupplier) {
69+
return RateLimit.<A>rateLimit().apply(instantSupplier);
70+
}
71+
72+
public static <A> Fn2<Duration, Iterable<A>, Iterable<A>> rateLimit(Supplier<Instant> instantSupplier, Long limit) {
73+
return RateLimit.<A>rateLimit(instantSupplier).apply(limit);
74+
}
75+
76+
public static <A> Fn1<Iterable<A>, Iterable<A>> rateLimit(Supplier<Instant> instantSupplier, Long limit,
77+
Duration duration) {
78+
return RateLimit.<A>rateLimit(instantSupplier, limit).apply(duration);
79+
}
80+
81+
public static <A> Iterable<A> rateLimit(Supplier<Instant> instantSupplier, Long limit, Duration duration,
82+
Iterable<A> as) {
83+
return RateLimit.<A>rateLimit(instantSupplier, limit, duration).apply(as);
84+
}
85+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.jnape.palatable.lambda.iteration;
2+
3+
import com.jnape.palatable.lambda.functions.builtin.fn4.RateLimit;
4+
5+
import java.util.Iterator;
6+
7+
/**
8+
* An exception thrown when a thread is interrupted while an {@link Iterator} was blocked.
9+
*
10+
* @see RateLimit
11+
*/
12+
public final class IterationInterruptedException extends RuntimeException {
13+
14+
public IterationInterruptedException(InterruptedException cause) {
15+
super(cause);
16+
}
17+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.jnape.palatable.lambda.iteration;
2+
3+
import com.jnape.palatable.lambda.adt.hlist.Tuple3;
4+
5+
import java.time.Duration;
6+
import java.time.Instant;
7+
import java.util.HashSet;
8+
import java.util.Iterator;
9+
import java.util.Set;
10+
import java.util.function.Supplier;
11+
12+
public final class RateLimitingIterable<A> implements Iterable<A> {
13+
private final Iterable<A> as;
14+
private final Set<Tuple3<Long, Duration, Supplier<Instant>>> rateLimits;
15+
16+
public RateLimitingIterable(Iterable<A> as, Set<Tuple3<Long, Duration, Supplier<Instant>>> rateLimits) {
17+
Set<Tuple3<Long, Duration, Supplier<Instant>>> combinedRateLimits = new HashSet<>(rateLimits);
18+
if (as instanceof RateLimitingIterable) {
19+
RateLimitingIterable<A> inner = (RateLimitingIterable<A>) as;
20+
combinedRateLimits.addAll(inner.rateLimits);
21+
as = inner.as;
22+
}
23+
this.rateLimits = combinedRateLimits;
24+
this.as = as;
25+
}
26+
27+
@Override
28+
public Iterator<A> iterator() {
29+
return new RateLimitingIterator<>(as.iterator(), rateLimits);
30+
}
31+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.jnape.palatable.lambda.iteration;
2+
3+
import com.jnape.palatable.lambda.adt.hlist.Tuple3;
4+
5+
import java.time.Duration;
6+
import java.time.Instant;
7+
import java.util.ArrayList;
8+
import java.util.HashMap;
9+
import java.util.Iterator;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.NoSuchElementException;
13+
import java.util.Set;
14+
import java.util.function.Supplier;
15+
16+
import static com.jnape.palatable.lambda.adt.Try.trying;
17+
import static com.jnape.palatable.lambda.functions.builtin.fn1.Size.size;
18+
import static com.jnape.palatable.lambda.functions.builtin.fn2.Filter.filter;
19+
import static com.jnape.palatable.lambda.functions.builtin.fn2.GT.gt;
20+
import static com.jnape.palatable.lambda.functions.builtin.fn2.GTE.gte;
21+
import static com.jnape.palatable.lambda.functions.builtin.fn2.LTE.lte;
22+
import static com.jnape.palatable.lambda.semigroup.builtin.Max.max;
23+
import static java.lang.Thread.sleep;
24+
import static java.util.Collections.emptyList;
25+
26+
public final class RateLimitingIterator<A> implements Iterator<A> {
27+
private final Iterator<A> asIterator;
28+
private final Set<Tuple3<Long, Duration, Supplier<Instant>>> rateLimits;
29+
private final Map<Tuple3<Long, Duration, Supplier<Instant>>, List<Instant>> timeSlicesByRateLimit;
30+
31+
public RateLimitingIterator(Iterator<A> asIterator, Set<Tuple3<Long, Duration, Supplier<Instant>>> rateLimits) {
32+
this.asIterator = asIterator;
33+
this.rateLimits = rateLimits;
34+
timeSlicesByRateLimit = new HashMap<>();
35+
}
36+
37+
@Override
38+
public boolean hasNext() {
39+
return asIterator.hasNext();
40+
}
41+
42+
@Override
43+
public A next() {
44+
if (!hasNext())
45+
throw new NoSuchElementException();
46+
awaitNextTimeSlice();
47+
return asIterator.next();
48+
}
49+
50+
private void awaitNextTimeSlice() {
51+
rateLimits.forEach(rateLimit -> {
52+
awaitNextTimeSliceForRateLimit(rateLimit);
53+
List<Instant> timeSlicesForRateLimit = timeSlicesByRateLimit.getOrDefault(rateLimit, new ArrayList<>());
54+
timeSlicesForRateLimit.add(rateLimit._3().get());
55+
timeSlicesByRateLimit.put(rateLimit, timeSlicesForRateLimit);
56+
});
57+
}
58+
59+
private void awaitNextTimeSliceForRateLimit(Tuple3<Long, Duration, Supplier<Instant>> rateLimit) {
60+
while (rateLimitExhaustedInTimeSlice(rateLimit)) {
61+
trying(() -> sleep(0)).biMapL(IterationInterruptedException::new).orThrow();
62+
}
63+
}
64+
65+
private boolean rateLimitExhaustedInTimeSlice(Tuple3<Long, Duration, Supplier<Instant>> rateLimit) {
66+
List<Instant> timeSlicesForRateLimit = timeSlicesByRateLimit.getOrDefault(rateLimit, emptyList());
67+
return rateLimit.into((limit, duration, instantSupplier) -> {
68+
Instant timeSliceEnd = instantSupplier.get();
69+
Instant previousTimeSliceEnd = timeSliceEnd.minus(duration);
70+
timeSlicesForRateLimit.removeIf(gt(previousTimeSliceEnd));
71+
return max(0L, limit - size(filter(mark -> gte(mark, previousTimeSliceEnd) && lte(mark, timeSliceEnd), timeSlicesForRateLimit))) == 0;
72+
});
73+
}
74+
75+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.jnape.palatable.lambda.functions.builtin.fn4;
2+
3+
import com.jnape.palatable.lambda.adt.Try;
4+
import com.jnape.palatable.lambda.functions.Fn1;
5+
import com.jnape.palatable.lambda.iteration.IterationInterruptedException;
6+
import com.jnape.palatable.traitor.annotations.TestTraits;
7+
import com.jnape.palatable.traitor.runners.Traits;
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
import org.junit.runner.RunWith;
11+
import testsupport.time.InstantRecordingClock;
12+
import testsupport.traits.Deforesting;
13+
import testsupport.traits.EmptyIterableSupport;
14+
import testsupport.traits.InfiniteIterableSupport;
15+
import testsupport.traits.Laziness;
16+
17+
import java.time.Duration;
18+
import java.util.concurrent.CountDownLatch;
19+
20+
import static com.jnape.palatable.lambda.functions.builtin.fn1.Repeat.repeat;
21+
import static com.jnape.palatable.lambda.functions.builtin.fn4.RateLimit.rateLimit;
22+
import static java.time.Clock.systemUTC;
23+
import static java.time.Duration.ZERO;
24+
import static java.util.Arrays.asList;
25+
import static java.util.Collections.emptyList;
26+
import static org.junit.Assert.assertThat;
27+
import static testsupport.matchers.IterableMatcher.iterates;
28+
import static testsupport.matchers.RateLimitedIterationMatcher.iteratesAccordingToRateLimit;
29+
30+
@RunWith(Traits.class)
31+
public class RateLimitTest {
32+
33+
private InstantRecordingClock clock;
34+
35+
@Before
36+
public void setUp() throws Exception {
37+
clock = new InstantRecordingClock(systemUTC());
38+
}
39+
40+
@TestTraits({Laziness.class, InfiniteIterableSupport.class, EmptyIterableSupport.class, Deforesting.class})
41+
public Fn1<Iterable<Object>, Iterable<Object>> testSubject() {
42+
return rateLimit(systemUTC()::instant, 1L, ZERO);
43+
}
44+
45+
@Test(expected = IllegalArgumentException.class)
46+
public void lessThanOneLimitIsInvalid() {
47+
rateLimit(clock::instant, 0L, ZERO, emptyList());
48+
}
49+
50+
@Test
51+
public void zeroDurationJustIteratesElements() {
52+
assertThat(rateLimit(clock::instant, 1L, ZERO, asList(1, 2, 3)), iterates(1, 2, 3));
53+
}
54+
55+
@Test
56+
public void limitPerDurationIsHonoredAccordingToClock() {
57+
Duration duration = Duration.ofMillis(10);
58+
long limit = 2L;
59+
assertThat(rateLimit(clock::instant, limit, duration, asList(1, 2, 3, 4)),
60+
iteratesAccordingToRateLimit(limit, duration, asList(1, 2, 3, 4), clock));
61+
}
62+
63+
@Test(timeout = 100, expected = IterationInterruptedException.class)
64+
public void rateLimitingDelayIsInterruptible() throws InterruptedException {
65+
Thread testThread = Thread.currentThread();
66+
CountDownLatch latch = new CountDownLatch(1);
67+
new Thread(() -> {
68+
Try.<InterruptedException>trying(latch::await).biMapL(AssertionError::new)
69+
.orThrow();
70+
testThread.interrupt();
71+
}) {{
72+
start();
73+
}};
74+
75+
rateLimit(clock::instant, 1L, Duration.ofSeconds(1), repeat(1)).forEach(xs -> latch.countDown());
76+
}
77+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package testsupport.matchers;
2+
3+
import com.jnape.palatable.lambda.functions.builtin.fn2.Eq;
4+
import org.hamcrest.Description;
5+
import org.hamcrest.TypeSafeMatcher;
6+
import testsupport.time.InstantRecordingClock;
7+
8+
import java.time.Duration;
9+
import java.time.Instant;
10+
import java.util.ArrayList;
11+
import java.util.Iterator;
12+
13+
import static com.jnape.palatable.lambda.functions.builtin.fn2.All.all;
14+
import static com.jnape.palatable.lambda.functions.builtin.fn2.InGroupsOf.inGroupsOf;
15+
import static com.jnape.palatable.lambda.functions.builtin.fn2.Map.map;
16+
import static com.jnape.palatable.lambda.functions.builtin.fn2.Slide.slide;
17+
import static com.jnape.palatable.lambda.functions.builtin.fn2.ToCollection.toCollection;
18+
import static com.jnape.palatable.lambda.functions.builtin.fn2.Zip.zip;
19+
import static java.time.Duration.between;
20+
21+
public final class RateLimitedIterationMatcher<A> extends TypeSafeMatcher<Iterable<A>> {
22+
private final Iterable<A> elements;
23+
private final Duration delay;
24+
private final InstantRecordingClock clock;
25+
private final Long limit;
26+
27+
public RateLimitedIterationMatcher(Long limit, Duration delay, Iterable<A> elements, InstantRecordingClock clock) {
28+
this.elements = elements;
29+
this.delay = delay;
30+
this.clock = clock;
31+
this.limit = limit;
32+
}
33+
34+
@Override
35+
protected boolean matchesSafely(Iterable<A> xs) {
36+
xs.forEach(__ -> clock.saveLastInstant());
37+
38+
Boolean enoughDelay = all(d -> d.toNanos() > delay.toNanos(), map(boundaries -> {
39+
Iterator<Instant> it = boundaries.iterator();
40+
Instant first = it.next();
41+
Instant second = it.next();
42+
return between(first, second);
43+
}, slide(2, map(instants -> instants.iterator().next(), inGroupsOf(limit.intValue(), clock.instants())))));
44+
45+
Boolean sameElements = all(Eq.<A>eq().uncurry(), zip(elements, xs));
46+
47+
return enoughDelay && sameElements;
48+
}
49+
50+
@Override
51+
public void describeTo(Description description) {
52+
description.appendText("Iterated elements " + toCollection(ArrayList::new, elements) + " with at least " + delay.toMillis() + "ms between groups of " + limit);
53+
}
54+
55+
@Override
56+
protected void describeMismatchSafely(Iterable<A> item, Description mismatchDescription) {
57+
mismatchDescription.appendText("Iterated elements "
58+
+ toCollection(ArrayList::new, item)
59+
+ " with the following delays between groups: "
60+
+ toCollection(ArrayList::new, map(instants -> instants.iterator().next(), inGroupsOf(limit.intValue(), clock.instants()))));
61+
}
62+
63+
public static <A> RateLimitedIterationMatcher<A> iteratesAccordingToRateLimit(Long limit, Duration duration,
64+
Iterable<A> elements,
65+
InstantRecordingClock clock) {
66+
return new RateLimitedIterationMatcher<>(limit, duration, elements, clock);
67+
}
68+
}

0 commit comments

Comments
 (0)