Skip to content

Commit 9173e24

Browse files
committed
queueTimeValues cluster test
1 parent 6b43f92 commit 9173e24

File tree

1 file changed

+160
-15
lines changed

1 file changed

+160
-15
lines changed

src/test/java/qa/overload/OverloadControlTest.java

Lines changed: 160 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
import com.arangodb.ArangoDB;
44
import com.arangodb.ArangoDBException;
5-
import com.arangodb.BaseTest;
5+
import com.arangodb.Protocol;
6+
import com.arangodb.entity.ServerRole;
67
import com.arangodb.velocypack.VPackParser;
78
import com.arangodb.velocystream.Request;
89
import com.arangodb.velocystream.RequestType;
910
import com.arangodb.velocystream.Response;
11+
import org.junit.After;
12+
import org.junit.Ignore;
1013
import org.junit.Test;
1114
import org.junit.runner.RunWith;
1215
import org.junit.runners.Parameterized;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1318

1419
import java.util.List;
1520
import java.util.stream.IntStream;
@@ -18,14 +23,47 @@
1823
import static org.hamcrest.MatcherAssert.assertThat;
1924
import static org.hamcrest.Matchers.greaterThan;
2025
import static org.hamcrest.Matchers.is;
26+
import static org.junit.Assume.assumeTrue;
2127

28+
/**
29+
* Implementation of Overload Control QA.
30+
* Test plan: https://arangodb.atlassian.net/wiki/spaces/AQA/pages/1753579579/Overload+control
31+
*/
32+
@Ignore("Manual test only, remove arangodb.properties before running.")
2233
@RunWith(Parameterized.class)
23-
public class OverloadControlTest extends BaseTest {
34+
public class OverloadControlTest {
35+
private static final Logger LOGGER = LoggerFactory.getLogger(OverloadControlTest.class);
2436
private static final VPackParser PARSER = new VPackParser.Builder().build();
2537
private static final String QUEUE_TIME_HEADER = "X-Arango-Queue-Time-Seconds";
38+
private final Protocol protocol;
39+
private final ArangoDB arangoDB1;
2640

27-
public OverloadControlTest(final ArangoDB arangoDB) {
28-
super(arangoDB);
41+
@Parameterized.Parameters
42+
public static List<Protocol> builders() {
43+
return List.of(
44+
Protocol.VST,
45+
Protocol.HTTP_JSON,
46+
Protocol.HTTP_VPACK
47+
);
48+
}
49+
50+
public OverloadControlTest(final Protocol protocol) {
51+
this.protocol = protocol;
52+
arangoDB1 = new ArangoDB.Builder()
53+
.maxConnections(500)
54+
.host("172.17.0.1", 8529)
55+
.password("test")
56+
.useProtocol(protocol)
57+
.build();
58+
}
59+
60+
@After
61+
public void shutdown() {
62+
arangoDB1.shutdown();
63+
}
64+
65+
private boolean isCluster() {
66+
return arangoDB1.getRole() == ServerRole.COORDINATOR;
2967
}
3068

3169
private static Response createCursor(ArangoDB arangoDB, Double queueTime) {
@@ -39,12 +77,20 @@ private static Response createCursor(ArangoDB arangoDB, Double queueTime) {
3977
return arangoDB.execute(req);
4078
}
4179

42-
private static class CursorRequest implements Runnable {
80+
private static Response readCursor(ArangoDB arangoDB, String cursorId, Double queueTime) {
81+
Request req = new Request("_system", RequestType.POST, "/_api/cursor/" + cursorId);
82+
if (queueTime != null) {
83+
req.putHeaderParam(QUEUE_TIME_HEADER, String.valueOf(queueTime));
84+
}
85+
return arangoDB.execute(req);
86+
}
87+
88+
private static class CreateCursorRequest implements Runnable {
4389
private final ArangoDB arangoDB;
4490
private final Double queueTime;
4591
private volatile Response response;
4692

47-
public CursorRequest(ArangoDB arangoDB, Double queueTime) {
93+
public CreateCursorRequest(ArangoDB arangoDB, Double queueTime) {
4894
this.arangoDB = arangoDB;
4995
this.queueTime = queueTime;
5096
}
@@ -67,21 +113,48 @@ public double getExecutionTime() {
67113
}
68114
}
69115

116+
private static class ReadCursorRequest implements Runnable {
117+
private final ArangoDB arangoDB;
118+
private final String cursorId;
119+
private final Double queueTime;
120+
private volatile Response response;
121+
122+
public ReadCursorRequest(ArangoDB arangoDB, String cursorId, Double queueTime) {
123+
this.arangoDB = arangoDB;
124+
this.cursorId = cursorId;
125+
this.queueTime = queueTime;
126+
}
127+
128+
@Override
129+
public void run() {
130+
readCursor(arangoDB, cursorId, null);
131+
response = readCursor(arangoDB, cursorId, queueTime);
132+
}
133+
134+
public Response getResponse() {
135+
return response;
136+
}
137+
138+
public double getExecutionTime() {
139+
return Double.parseDouble(response.getMeta().get(QUEUE_TIME_HEADER));
140+
}
141+
}
142+
70143
@Test
71144
public void queueTimeValues() throws InterruptedException {
72-
List<CursorRequest> reqsWithNoQT = IntStream.range(0, 50)
73-
.mapToObj(__ -> new CursorRequest(arangoDB, null))
145+
List<CreateCursorRequest> reqsWithNoQT = IntStream.range(0, 50)
146+
.mapToObj(__ -> new CreateCursorRequest(arangoDB1, null))
74147
.toList();
75148

76-
List<CursorRequest> reqsWithHighQT = IntStream.range(0, 10)
77-
.mapToObj(__ -> new CursorRequest(arangoDB, 20.0))
149+
List<CreateCursorRequest> reqsWithHighQT = IntStream.range(0, 10)
150+
.mapToObj(__ -> new CreateCursorRequest(arangoDB1, 20.0))
78151
.toList();
79152

80-
List<CursorRequest> reqsWithLowQT = IntStream.range(0, 10)
81-
.mapToObj(__ -> new CursorRequest(arangoDB, 1.0))
153+
List<CreateCursorRequest> reqsWithLowQT = IntStream.range(0, 10)
154+
.mapToObj(__ -> new CreateCursorRequest(arangoDB1, 1.0))
82155
.toList();
83156

84-
List<CursorRequest> reqs = Stream.concat(reqsWithNoQT.stream(), reqsWithHighQT.stream()).toList();
157+
List<CreateCursorRequest> reqs = Stream.concat(reqsWithNoQT.stream(), reqsWithHighQT.stream()).toList();
85158

86159
List<Thread> threads = reqs.stream()
87160
.map(Thread::new)
@@ -92,7 +165,7 @@ public void queueTimeValues() throws InterruptedException {
92165
}
93166

94167
int errorCount = 0;
95-
for (CursorRequest r : reqsWithLowQT) {
168+
for (CreateCursorRequest r : reqsWithLowQT) {
96169
try {
97170
r.run();
98171
} catch (ArangoDBException e) {
@@ -109,8 +182,80 @@ public void queueTimeValues() throws InterruptedException {
109182
}
110183

111184
long nonZeroQTCount = reqs.stream().filter(r -> r.getExecutionTime() > 0.0).count();
112-
System.out.println("response with queue time > 0: " + nonZeroQTCount + "/"+ reqs.size());
185+
System.out.println("response with queue time > 0: " + nonZeroQTCount + "/" + reqs.size());
186+
187+
assertThat((int) nonZeroQTCount, is(greaterThan(0)));
188+
}
189+
190+
@Test
191+
public void queueTimeValuesCluster() throws InterruptedException {
192+
assumeTrue(isCluster());
113193

194+
ArangoDB arangoDB2 = new ArangoDB.Builder()
195+
.maxConnections(500)
196+
.host("172.17.0.1", 8539)
197+
.password("test")
198+
.useProtocol(protocol)
199+
.build();
200+
201+
List<CreateCursorRequest> createCursorReqs = IntStream.range(0, 50)
202+
.mapToObj(__ -> new CreateCursorRequest(arangoDB1, null))
203+
.toList();
204+
205+
List<Thread> threads = createCursorReqs.stream()
206+
.map(Thread::new)
207+
.toList();
208+
209+
for (Thread t : threads) {
210+
t.start();
211+
}
212+
213+
for (Thread t : threads) {
214+
t.join();
215+
}
216+
217+
LOGGER.info("starting ReadCursorRequests");
218+
List<ReadCursorRequest> reqsWithNoQT = createCursorReqs.stream()
219+
.limit(40)
220+
.map(CreateCursorRequest::getCursorId)
221+
.map(cid -> new ReadCursorRequest(arangoDB1, cid, null))
222+
.toList();
223+
224+
List<Thread> threads2 = reqsWithNoQT.stream()
225+
.map(Thread::new)
226+
.toList();
227+
228+
for (Thread t : threads2) {
229+
t.start();
230+
}
231+
232+
List<ReadCursorRequest> reqsWithLowQT = createCursorReqs.stream()
233+
.skip(40)
234+
.map(CreateCursorRequest::getCursorId)
235+
.map(cid -> new ReadCursorRequest(arangoDB2, cid, 2.0))
236+
.toList();
237+
238+
int errorCount = 0;
239+
for (ReadCursorRequest r : reqsWithLowQT) {
240+
try {
241+
r.run();
242+
} catch (ArangoDBException e) {
243+
errorCount++;
244+
e.printStackTrace();
245+
}
246+
}
247+
248+
for (Thread t : threads2) {
249+
t.join();
250+
}
251+
252+
assertThat(errorCount, is(0));
253+
LOGGER.info("completed ReadCursorRequests");
254+
255+
long nonZeroQTCount = reqsWithNoQT.stream().filter(r -> r.getExecutionTime() > 0.0).count();
256+
System.out.println("response with queue time > 0: " + nonZeroQTCount + "/" + reqsWithNoQT.size());
114257
assertThat((int) nonZeroQTCount, is(greaterThan(0)));
258+
115259
}
260+
116261
}

0 commit comments

Comments
 (0)