@@ -34,6 +34,8 @@ public class SimplePipeHttpServlet extends HttpServlet {
34
34
35
35
protected long pipeScriptBreakout = 1200000 ; // 20 minutes
36
36
37
+ protected int pipeMaxItemsPerQuery = -1 ; // infinite
38
+
37
39
/*
38
40
* Example of web.xml:
39
41
<servlet>
@@ -47,6 +49,10 @@ public class SimplePipeHttpServlet extends HttpServlet {
47
49
<param-name>simple.pipe.script.breakout</param-name>
48
50
<param-value>1200000</param-value>
49
51
</init-param>
52
+ <init-param>
53
+ <param-name>simple.pipe.max.items.per.query</param-name>
54
+ <param-value>60</param-value>
55
+ </init-param>
50
56
</servlet>
51
57
<servlet-mapping>
52
58
<servlet-name>simplepipe</servlet-name>
@@ -83,6 +89,19 @@ public void init() throws ServletException {
83
89
e .printStackTrace ();
84
90
}
85
91
}
92
+ String perQueryStr = getInitParameter ("simple.pipe.max.items.per.query" );
93
+ if (perQueryStr != null ) {
94
+ try {
95
+ pipeMaxItemsPerQuery = Integer .parseInt (perQueryStr );
96
+ if (pipeMaxItemsPerQuery <= 0 ) {
97
+ pipeMaxItemsPerQuery = -1 ; // 0, -1 means infinite items
98
+ } else if (pipeMaxItemsPerQuery < 5 ) {
99
+ pipeMaxItemsPerQuery = 5 ; // hey, we think limiting for less than 5 items make no senses.
100
+ }
101
+ } catch (NumberFormatException e ) {
102
+ e .printStackTrace ();
103
+ }
104
+ }
86
105
super .init ();
87
106
}
88
107
@@ -193,6 +212,7 @@ protected void doPipe(final HttpServletResponse resp, String key, String type, S
193
212
if (pipe != null ) {
194
213
waitClosingInterval = pipe .pipeWaitClosingInterval ();
195
214
}
215
+ int items = 0 ;
196
216
while ((vector = SimplePipeHelper .getPipeVector (key )) != null
197
217
/* && SimplePipeHelper.isPipeLive(key) */ // check it!
198
218
&& !writer .checkError ()) {
@@ -224,6 +244,11 @@ protected void doPipe(final HttpServletResponse resp, String key, String type, S
224
244
}
225
245
if (ss == null ) break ; // terminating signal
226
246
output (writer , type , key , ss .serialize ());
247
+ items ++;
248
+ if (pipeMaxItemsPerQuery > 0 && items >= pipeMaxItemsPerQuery
249
+ && !SimplePipeRequest .PIPE_TYPE_CONTINUUM .equals (type )) {
250
+ break ;
251
+ }
227
252
lastPipeDataWritten = new Date ().getTime ();
228
253
writer .flush ();
229
254
if (ss instanceof ISimplePipePriority ) {
@@ -256,6 +281,8 @@ protected void doPipe(final HttpServletResponse resp, String key, String type, S
256
281
257
282
now = new Date ().getTime ();
258
283
if ((vector = SimplePipeHelper .getPipeVector (key )) != null // may be broken down already!!
284
+ && (pipeMaxItemsPerQuery <= 0 || items < pipeMaxItemsPerQuery
285
+ || SimplePipeRequest .PIPE_TYPE_CONTINUUM .equals (type ))
259
286
&& (SimplePipeRequest .PIPE_TYPE_CONTINUUM .equals (type )
260
287
|| (SimplePipeRequest .PIPE_TYPE_SCRIPT .equals (type )
261
288
&& now - beforeLoop < pipeScriptBreakout )
0 commit comments