2
2
3
3
import com .arangodb .ArangoDB ;
4
4
import com .arangodb .ArangoDBException ;
5
- import com .arangodb .BaseTest ;
5
+ import com .arangodb .Protocol ;
6
+ import com .arangodb .entity .ServerRole ;
6
7
import com .arangodb .velocypack .VPackParser ;
7
8
import com .arangodb .velocystream .Request ;
8
9
import com .arangodb .velocystream .RequestType ;
9
10
import com .arangodb .velocystream .Response ;
11
+ import org .junit .After ;
12
+ import org .junit .Ignore ;
10
13
import org .junit .Test ;
11
14
import org .junit .runner .RunWith ;
12
15
import org .junit .runners .Parameterized ;
16
+ import org .slf4j .Logger ;
17
+ import org .slf4j .LoggerFactory ;
13
18
14
19
import java .util .List ;
15
20
import java .util .stream .IntStream ;
18
23
import static org .hamcrest .MatcherAssert .assertThat ;
19
24
import static org .hamcrest .Matchers .greaterThan ;
20
25
import static org .hamcrest .Matchers .is ;
26
+ import static org .junit .Assume .assumeTrue ;
21
27
28
+ /**
29
+ * Implementation of Overload Control QA.
30
+ * Test plan: https://arangodb.atlassian.net/wiki/spaces/AQA/pages/1753579579/Overload+control
31
+ */
32
+ @ Ignore ("Manual test only, remove arangodb.properties before running." )
22
33
@ RunWith (Parameterized .class )
23
- public class OverloadControlTest extends BaseTest {
34
+ public class OverloadControlTest {
35
+ private static final Logger LOGGER = LoggerFactory .getLogger (OverloadControlTest .class );
24
36
private static final VPackParser PARSER = new VPackParser .Builder ().build ();
25
37
private static final String QUEUE_TIME_HEADER = "X-Arango-Queue-Time-Seconds" ;
38
+ private final Protocol protocol ;
39
+ private final ArangoDB arangoDB1 ;
26
40
27
- public OverloadControlTest (final ArangoDB arangoDB ) {
28
- super (arangoDB );
41
+ @ Parameterized .Parameters
42
+ public static List <Protocol > builders () {
43
+ return List .of (
44
+ Protocol .VST ,
45
+ Protocol .HTTP_JSON ,
46
+ Protocol .HTTP_VPACK
47
+ );
48
+ }
49
+
50
+ public OverloadControlTest (final Protocol protocol ) {
51
+ this .protocol = protocol ;
52
+ arangoDB1 = new ArangoDB .Builder ()
53
+ .maxConnections (500 )
54
+ .host ("172.17.0.1" , 8529 )
55
+ .password ("test" )
56
+ .useProtocol (protocol )
57
+ .build ();
58
+ }
59
+
60
+ @ After
61
+ public void shutdown () {
62
+ arangoDB1 .shutdown ();
63
+ }
64
+
65
+ private boolean isCluster () {
66
+ return arangoDB1 .getRole () == ServerRole .COORDINATOR ;
29
67
}
30
68
31
69
private static Response createCursor (ArangoDB arangoDB , Double queueTime ) {
@@ -39,12 +77,20 @@ private static Response createCursor(ArangoDB arangoDB, Double queueTime) {
39
77
return arangoDB .execute (req );
40
78
}
41
79
42
- private static class CursorRequest implements Runnable {
80
+ private static Response readCursor (ArangoDB arangoDB , String cursorId , Double queueTime ) {
81
+ Request req = new Request ("_system" , RequestType .POST , "/_api/cursor/" + cursorId );
82
+ if (queueTime != null ) {
83
+ req .putHeaderParam (QUEUE_TIME_HEADER , String .valueOf (queueTime ));
84
+ }
85
+ return arangoDB .execute (req );
86
+ }
87
+
88
+ private static class CreateCursorRequest implements Runnable {
43
89
private final ArangoDB arangoDB ;
44
90
private final Double queueTime ;
45
91
private volatile Response response ;
46
92
47
- public CursorRequest (ArangoDB arangoDB , Double queueTime ) {
93
+ public CreateCursorRequest (ArangoDB arangoDB , Double queueTime ) {
48
94
this .arangoDB = arangoDB ;
49
95
this .queueTime = queueTime ;
50
96
}
@@ -67,21 +113,48 @@ public double getExecutionTime() {
67
113
}
68
114
}
69
115
116
+ private static class ReadCursorRequest implements Runnable {
117
+ private final ArangoDB arangoDB ;
118
+ private final String cursorId ;
119
+ private final Double queueTime ;
120
+ private volatile Response response ;
121
+
122
+ public ReadCursorRequest (ArangoDB arangoDB , String cursorId , Double queueTime ) {
123
+ this .arangoDB = arangoDB ;
124
+ this .cursorId = cursorId ;
125
+ this .queueTime = queueTime ;
126
+ }
127
+
128
+ @ Override
129
+ public void run () {
130
+ readCursor (arangoDB , cursorId , null );
131
+ response = readCursor (arangoDB , cursorId , queueTime );
132
+ }
133
+
134
+ public Response getResponse () {
135
+ return response ;
136
+ }
137
+
138
+ public double getExecutionTime () {
139
+ return Double .parseDouble (response .getMeta ().get (QUEUE_TIME_HEADER ));
140
+ }
141
+ }
142
+
70
143
@ Test
71
144
public void queueTimeValues () throws InterruptedException {
72
- List <CursorRequest > reqsWithNoQT = IntStream .range (0 , 50 )
73
- .mapToObj (__ -> new CursorRequest ( arangoDB , null ))
145
+ List <CreateCursorRequest > reqsWithNoQT = IntStream .range (0 , 50 )
146
+ .mapToObj (__ -> new CreateCursorRequest ( arangoDB1 , null ))
74
147
.toList ();
75
148
76
- List <CursorRequest > reqsWithHighQT = IntStream .range (0 , 10 )
77
- .mapToObj (__ -> new CursorRequest ( arangoDB , 20.0 ))
149
+ List <CreateCursorRequest > reqsWithHighQT = IntStream .range (0 , 10 )
150
+ .mapToObj (__ -> new CreateCursorRequest ( arangoDB1 , 20.0 ))
78
151
.toList ();
79
152
80
- List <CursorRequest > reqsWithLowQT = IntStream .range (0 , 10 )
81
- .mapToObj (__ -> new CursorRequest ( arangoDB , 1.0 ))
153
+ List <CreateCursorRequest > reqsWithLowQT = IntStream .range (0 , 10 )
154
+ .mapToObj (__ -> new CreateCursorRequest ( arangoDB1 , 1.0 ))
82
155
.toList ();
83
156
84
- List <CursorRequest > reqs = Stream .concat (reqsWithNoQT .stream (), reqsWithHighQT .stream ()).toList ();
157
+ List <CreateCursorRequest > reqs = Stream .concat (reqsWithNoQT .stream (), reqsWithHighQT .stream ()).toList ();
85
158
86
159
List <Thread > threads = reqs .stream ()
87
160
.map (Thread ::new )
@@ -92,7 +165,7 @@ public void queueTimeValues() throws InterruptedException {
92
165
}
93
166
94
167
int errorCount = 0 ;
95
- for (CursorRequest r : reqsWithLowQT ) {
168
+ for (CreateCursorRequest r : reqsWithLowQT ) {
96
169
try {
97
170
r .run ();
98
171
} catch (ArangoDBException e ) {
@@ -109,8 +182,80 @@ public void queueTimeValues() throws InterruptedException {
109
182
}
110
183
111
184
long nonZeroQTCount = reqs .stream ().filter (r -> r .getExecutionTime () > 0.0 ).count ();
112
- System .out .println ("response with queue time > 0: " + nonZeroQTCount + "/" + reqs .size ());
185
+ System .out .println ("response with queue time > 0: " + nonZeroQTCount + "/" + reqs .size ());
186
+
187
+ assertThat ((int ) nonZeroQTCount , is (greaterThan (0 )));
188
+ }
189
+
190
+ @ Test
191
+ public void queueTimeValuesCluster () throws InterruptedException {
192
+ assumeTrue (isCluster ());
113
193
194
+ ArangoDB arangoDB2 = new ArangoDB .Builder ()
195
+ .maxConnections (500 )
196
+ .host ("172.17.0.1" , 8539 )
197
+ .password ("test" )
198
+ .useProtocol (protocol )
199
+ .build ();
200
+
201
+ List <CreateCursorRequest > createCursorReqs = IntStream .range (0 , 50 )
202
+ .mapToObj (__ -> new CreateCursorRequest (arangoDB1 , null ))
203
+ .toList ();
204
+
205
+ List <Thread > threads = createCursorReqs .stream ()
206
+ .map (Thread ::new )
207
+ .toList ();
208
+
209
+ for (Thread t : threads ) {
210
+ t .start ();
211
+ }
212
+
213
+ for (Thread t : threads ) {
214
+ t .join ();
215
+ }
216
+
217
+ LOGGER .info ("starting ReadCursorRequests" );
218
+ List <ReadCursorRequest > reqsWithNoQT = createCursorReqs .stream ()
219
+ .limit (40 )
220
+ .map (CreateCursorRequest ::getCursorId )
221
+ .map (cid -> new ReadCursorRequest (arangoDB1 , cid , null ))
222
+ .toList ();
223
+
224
+ List <Thread > threads2 = reqsWithNoQT .stream ()
225
+ .map (Thread ::new )
226
+ .toList ();
227
+
228
+ for (Thread t : threads2 ) {
229
+ t .start ();
230
+ }
231
+
232
+ List <ReadCursorRequest > reqsWithLowQT = createCursorReqs .stream ()
233
+ .skip (40 )
234
+ .map (CreateCursorRequest ::getCursorId )
235
+ .map (cid -> new ReadCursorRequest (arangoDB2 , cid , 2.0 ))
236
+ .toList ();
237
+
238
+ int errorCount = 0 ;
239
+ for (ReadCursorRequest r : reqsWithLowQT ) {
240
+ try {
241
+ r .run ();
242
+ } catch (ArangoDBException e ) {
243
+ errorCount ++;
244
+ e .printStackTrace ();
245
+ }
246
+ }
247
+
248
+ for (Thread t : threads2 ) {
249
+ t .join ();
250
+ }
251
+
252
+ assertThat (errorCount , is (0 ));
253
+ LOGGER .info ("completed ReadCursorRequests" );
254
+
255
+ long nonZeroQTCount = reqsWithNoQT .stream ().filter (r -> r .getExecutionTime () > 0.0 ).count ();
256
+ System .out .println ("response with queue time > 0: " + nonZeroQTCount + "/" + reqsWithNoQT .size ());
114
257
assertThat ((int ) nonZeroQTCount , is (greaterThan (0 )));
258
+
115
259
}
260
+
116
261
}
0 commit comments