Skip to content

Commit 0375b28

Browse files
committed
make body filter more safe under thread-pool mode
1 parent 7f7752a commit 0375b28

File tree

7 files changed

+381
-39
lines changed

7 files changed

+381
-39
lines changed

nginx-clojure-embed/test/java/nginx/clojure/embed/JavaHandlersTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public void testStartAndStop() throws ParseException, ClientProtocolException, I
226226

227227
public static void main(String[] args) {
228228
NginxEmbedServer server = NginxEmbedServer.getServer();
229-
server.setWorkDir("test/work-dir");
229+
// server.setWorkDir("test/work-dir");
230230
Map<String, String> opts = ArrayMap.create("port", "8084",
231231
"http-user-defined", "shared_map mycounters hashmap?space=32k&entries=400;");
232232
int port = server.start(SimpleRouting.class.getName(), opts);

src/c/ngx_http_clojure_mem.c

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1536,6 +1536,13 @@ static ngx_int_t ngx_http_clojure_hijack_send(ngx_http_request_t *r, u_char *me
15361536
return NGX_OK;
15371537
}
15381538

1539+
if (r->connection->log->log_level & NGX_LOG_DEBUG_HTTP) {
1540+
ngx_str_t msg;
1541+
msg.data = message;
1542+
msg.len = len;
1543+
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "[%" PRIu64 "] hijack send : {%V}, len %d, flag : %d", (uintptr_t)r , &msg, len, flag);
1544+
}
1545+
15391546
page_size = ((ngx_http_clojure_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_clojure_module))->write_page_size;
15401547

15411548
if (flag & NGX_CLOJURE_BUF_IGNORE_FILTER_FLAG) {
@@ -2786,15 +2793,15 @@ static jlong JNICALL jni_ngx_http_hijack_send(JNIEnv *env, jclass cls, jlong req
27862793

27872794
ngx_int_t rc = ngx_http_clojure_hijack_send((ngx_http_request_t *) (uintptr_t) req,
27882795
ngx_http_clojure_abs_off_addr(obj, offset), len, flag);
2789-
if (rc != NGX_OK) {
2796+
if (rc != NGX_OK && rc != NGX_DONE) {
27902797
ngx_http_finalize_request((ngx_http_request_t *)(uintptr_t)req, rc);
27912798
}
27922799
return rc;
27932800
}
27942801

27952802
static jlong JNICALL jni_ngx_http_hijack_send_chain(JNIEnv *env, jclass cls, jlong req, jlong chain, jint flag) {
27962803
ngx_int_t rc = ngx_http_clojure_hijack_send_chain((ngx_http_request_t *)(uintptr_t)req, (ngx_chain_t *)(uintptr_t)chain, flag);
2797-
if (rc != NGX_OK) {
2804+
if (rc != NGX_OK && rc != NGX_DONE) {
27982805
ngx_http_finalize_request((ngx_http_request_t *)(uintptr_t)req, rc);
27992806
}
28002807
return rc;
@@ -3006,6 +3013,14 @@ static jlong JNICALL jni_ngx_http_clojure_add_listener(JNIEnv *env, jclass cls,
30063013

30073014
static void JNICALL jni_ngx_http_finalize_request (JNIEnv *env, jclass cls, jlong req , jlong rc) {
30083015
ngx_http_request_t *r = (ngx_http_request_t *)(uintptr_t)req;
3016+
ngx_http_clojure_module_ctx_t *ctx;
3017+
ngx_http_clojure_get_ctx(r, ctx);
3018+
3019+
if (!r || !ctx || !r->pool) {
3020+
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "nginx-clojure ctx is cleaned, r=%" PRIu64, (uintptr_t)r);
3021+
return;
3022+
}
3023+
30093024
if (!r->header_sent) {
30103025
(void)ngx_http_clojure_prepare_server_header(r);
30113026
}
@@ -3029,26 +3044,26 @@ ngx_int_t ngx_http_clojure_filter_continue_next_body_filter(ngx_http_request_t *
30293044
return ngx_http_clojure_next_body_filter(r, in);
30303045
}
30313046

3032-
static jlong JNICALL jni_ngx_http_filter_continue_next(JNIEnv *env, jclass cls, jlong req , jlong chain) {
3033-
ngx_http_request_t *r = (ngx_http_request_t*)(uintptr_t)req;
3034-
ngx_http_clojure_module_ctx_t *ctx;
3035-
ngx_chain_t *in = (ngx_chain_t *)(uintptr_t)chain;
3036-
ngx_int_t rc ;
3047+
static jlong JNICALL jni_ngx_http_filter_continue_next(JNIEnv *env, jclass cls, jlong req, jlong chain) {
3048+
ngx_http_request_t *r = (ngx_http_request_t*) (uintptr_t) req;
3049+
ngx_http_clojure_module_ctx_t *ctx;
3050+
ngx_chain_t *in = (ngx_chain_t *) (uintptr_t) chain;
3051+
ngx_int_t rc;
30373052

3038-
ngx_http_clojure_get_ctx(r, ctx);
3053+
ngx_http_clojure_get_ctx(r, ctx);
30393054

3040-
ngx_http_clojure_try_unset_reload_delay_timer(ctx, "jni_ngx_http_filter_continue_next");
3055+
ngx_http_clojure_try_unset_reload_delay_timer(ctx, "jni_ngx_http_filter_continue_next");
30413056

3042-
if (chain < 0) { /*header filter*/
3043-
rc = ngx_http_clojure_next_header_filter( r);
3044-
ctx->wait_for_header_filter = 0;
3045-
if (ctx->pending_body_filter) {
3046-
rc = ngx_http_clojure_next_body_filter(r, ctx->pending);
3047-
}
3048-
return rc;
3049-
}else {
3050-
return ngx_http_clojure_filter_continue_next_body_filter(r, in);
3051-
}
3057+
if (chain < 0) { /*header filter*/
3058+
rc = ngx_http_clojure_next_header_filter(r);
3059+
ctx->wait_for_header_filter = 0;
3060+
if (ctx->pending_body_filter) {
3061+
rc = ngx_http_clojure_next_body_filter(r, ctx->pending);
3062+
}
3063+
return rc;
3064+
} else {
3065+
return ngx_http_clojure_filter_continue_next_body_filter(r, in);
3066+
}
30523067
}
30533068

30543069
static jlong JNICALL jni_ngx_http_clojure_mem_init_ngx_buf(JNIEnv *env, jclass cls, jlong buf, jobject obj, jlong offset, jlong len, jint last_buf) {
@@ -3654,6 +3669,7 @@ static jlong JNICALL jni_ngx_http_clojure_mem_inc_req_count(JNIEnv *env, jclass
36543669
ctx->hijacked_or_async = 1;
36553670
}
36563671
r->main->count = n;
3672+
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "jni_ngx_http_clojure_mem_inc_req_count, old : %d, new : %d", old, n);
36573673
return old;
36583674
}
36593675
return -1;

src/java/nginx/clojure/NginxClojureRT.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public static String formatVer(long ver) {
296296
public static final class WorkerResponseContext {
297297
public final NginxResponse response;
298298
public final NginxRequest request;
299-
public final long chain;
299+
public long chain;
300300

301301
public WorkerResponseContext(NginxResponse resp, NginxRequest req) {
302302
super();
@@ -310,7 +310,8 @@ public WorkerResponseContext(NginxResponse resp, NginxRequest req) {
310310
}
311311
}else {
312312
if (resp.type() == NginxResponse.TYPE_FAKE_BODY_FILTER_TAG) {
313-
chain = req.handler().buildOutputChain(resp);
313+
// chain = req.handler().buildOutputChain(resp);
314+
chain = 0;
314315
}else {
315316
chain = 0;
316317
}
@@ -1408,6 +1409,7 @@ public static int handlePostedResponse(long r) {
14081409
ngx_http_finalize_request(r, rc);
14091410
return NGX_OK;
14101411
}else if (ctx.request.phase() == NGX_HTTP_BODY_FILTER_PHASE) {
1412+
ctx.chain = req.handler().buildOutputChain(resp);
14111413
rc = ngx_http_filter_continue_next(r, ctx.chain);
14121414
if (resp.isLast()) {
14131415
ngx_http_finalize_request(r, rc);
@@ -1417,6 +1419,7 @@ public static int handlePostedResponse(long r) {
14171419
ngx_http_clojure_mem_continue_current_phase(r, NGX_DECLINED);
14181420
return NGX_OK;
14191421
}else if (ctx.request.phase() == NGX_HTTP_BODY_FILTER_PHASE) {
1422+
ctx.chain = req.handler().buildOutputChain(resp);
14201423
rc = ngx_http_filter_continue_next(r, ctx.chain);
14211424
if (resp.isLast()) {
14221425
ngx_http_finalize_request(r, rc);

src/java/nginx/clojure/clj/LazyFilterRequestMap.java

Lines changed: 159 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66
import static nginx.clojure.NginxClojureRT.pushNGXInt;
77

88
import java.io.IOException;
9+
import java.util.Iterator;
910
import java.util.Map;
1011
import java.util.concurrent.ConcurrentHashMap;
1112

13+
import clojure.lang.IMapEntry;
14+
import clojure.lang.IPersistentCollection;
15+
import clojure.lang.IPersistentMap;
16+
import clojure.lang.ISeq;
1217
import nginx.clojure.ChannelCloseAdapter;
1318
import nginx.clojure.NginxChainWrappedInputStream;
1419
import nginx.clojure.NginxFilterRequest;
1520
import nginx.clojure.NginxHandler;
16-
import nginx.clojure.Stack;
1721

1822
public class LazyFilterRequestMap extends LazyRequestMap implements NginxFilterRequest, Cloneable {
1923

@@ -31,6 +35,8 @@ public class LazyFilterRequestMap extends LazyRequestMap implements NginxFilterR
3135

3236
protected LazyHeaderMap responseHeaders;
3337

38+
protected LazyFilterRequestMap origin;
39+
3440
protected final static Map<Long, LazyFilterRequestMap> bodyFilterRequests = new ConcurrentHashMap<Long, LazyFilterRequestMap>();
3541

3642
protected final static ChannelCloseAdapter<Long> bodyFilterRequestsCleaner = new ChannelCloseAdapter<Long>() {
@@ -47,6 +53,8 @@ public static LazyFilterRequestMap cloneExisted(long r, long c) {
4753
if (req != null) {
4854
try {
4955
creq = (LazyFilterRequestMap) req.clone();
56+
creq.array = null;
57+
creq.origin = req;
5058
creq.c = c;
5159
if (c > 0) {
5260
creq.body = new NginxChainWrappedInputStream(creq, c);
@@ -78,6 +86,17 @@ public LazyFilterRequestMap(NginxHandler handler, long r, long c) {
7886
}
7987
}
8088

89+
/* (non-Javadoc)
90+
* @see nginx.clojure.clj.LazyRequestMap#reset(long, nginx.clojure.clj.NginxClojureHandler)
91+
*/
92+
@Override
93+
public void reset(long r, NginxClojureHandler handler) {
94+
if (origin == null) {
95+
super.reset(r, handler);
96+
} else {
97+
throw new UnsupportedOperationException("cloned filter request should not be reset!");
98+
}
99+
}
81100

82101
@Override
83102
public int responseStatus() {
@@ -99,7 +118,10 @@ public Map<String, Object> responseHeaders() {
99118
*/
100119
@Override
101120
public void prefetchAll() {
102-
super.prefetchAll();
121+
if (origin == null) {
122+
super.prefetchAll();
123+
}
124+
103125
if (body != null) {
104126
try {
105127
body.prefetchNativeData();
@@ -109,18 +131,148 @@ public void prefetchAll() {
109131
}
110132
}
111133

134+
/* (non-Javadoc)
135+
* @see nginx.clojure.clj.LazyRequestMap#index(java.lang.Object)
136+
*/
137+
@Override
138+
protected int index(Object key) {
139+
if (origin == null) {
140+
return super.index(key);
141+
}
142+
return origin.index(key);
143+
}
144+
145+
/* (non-Javadoc)
146+
* @see nginx.clojure.clj.LazyRequestMap#iterator()
147+
*/
148+
@Override
149+
public Iterator iterator() {
150+
if (origin == null) {
151+
return super.iterator();
152+
}
153+
return origin.iterator();
154+
}
155+
156+
@Override
157+
public IMapEntry entryAt(Object key) {
158+
if (origin == null) {
159+
return super.entryAt(key);
160+
}
161+
return origin.entryAt(key);
162+
}
163+
164+
@Override
165+
public int count() {
166+
if (origin == null) {
167+
return super.count();
168+
}
169+
return origin.count();
170+
}
171+
172+
/* (non-Javadoc)
173+
* @see nginx.clojure.clj.LazyRequestMap#cons(java.lang.Object)
174+
*/
175+
@Override
176+
public IPersistentCollection cons(Object o) {
177+
if (origin == null) {
178+
return super.cons(o);
179+
}
180+
return origin.cons(o);
181+
}
182+
183+
/* (non-Javadoc)
184+
* @see nginx.clojure.clj.LazyRequestMap#seq()
185+
*/
186+
@Override
187+
public ISeq seq() {
188+
if (origin == null) {
189+
return super.seq();
190+
}
191+
return origin.seq();
192+
}
193+
194+
/* (non-Javadoc)
195+
* @see nginx.clojure.clj.LazyRequestMap#element(int)
196+
*/
197+
@Override
198+
protected Object element(int i) {
199+
if (origin == null) {
200+
return super.element(i);
201+
}
202+
return origin.element(i);
203+
}
204+
205+
/* (non-Javadoc)
206+
* @see nginx.clojure.clj.LazyRequestMap#valAt(java.lang.Object)
207+
*/
208+
@Override
209+
public Object valAt(Object key) {
210+
if (origin == null) {
211+
return super.valAt(key);
212+
}
213+
return origin.valAt(key);
214+
}
215+
216+
/* (non-Javadoc)
217+
* @see nginx.clojure.clj.LazyRequestMap#valAt(java.lang.Object, java.lang.Object)
218+
*/
219+
@Override
220+
public Object valAt(Object key, Object notFound) {
221+
if (origin == null) {
222+
return super.valAt(key, notFound);
223+
}
224+
return origin.valAt(key, notFound);
225+
}
226+
227+
/* (non-Javadoc)
228+
* @see nginx.clojure.clj.LazyRequestMap#assoc(java.lang.Object, java.lang.Object)
229+
*/
230+
@Override
231+
public IPersistentMap assoc(Object key, Object val) {
232+
if (origin == null) {
233+
return super.assoc(key, val);
234+
}
235+
return origin.assoc(key, val);
236+
}
237+
238+
239+
/* (non-Javadoc)
240+
* @see nginx.clojure.clj.LazyRequestMap#assocEx(java.lang.Object, java.lang.Object)
241+
*/
242+
@Override
243+
public IPersistentMap assocEx(Object key, Object val) {
244+
if (origin == null) {
245+
return super.assocEx(key, val);
246+
}
247+
return origin.assocEx(key, val);
248+
}
249+
250+
/* (non-Javadoc)
251+
* @see nginx.clojure.clj.LazyRequestMap#without(java.lang.Object)
252+
*/
253+
@Override
254+
public IPersistentMap without(Object key) {
255+
if (origin == null) {
256+
return super.without(key);
257+
}
258+
return origin.without(key);
259+
}
260+
112261
@Override
113262
public void tagReleased() {
114263
this.released = true;
115264
this.channel = null;
116-
System.arraycopy(default_request_array, 0, array, 0, default_request_array.length);
117-
validLen = default_request_array.length;
118-
if (array.length > validLen) {
119-
Stack.fillNull(array, validLen, array.length - validLen);
120-
}
121265
if (listeners != null) {
122266
listeners.clear();
123267
}
268+
269+
// if (origin == null) {
270+
// System.arraycopy(default_request_array, 0, array, 0, default_request_array.length);
271+
// validLen = default_request_array.length;
272+
// if (array.length > validLen) {
273+
// Stack.fillNull(array, validLen, array.length - validLen);
274+
// }
275+
// }
124276
// ((NginxClojureHandler)handler).returnToRequestPool(this);
125277
}
126278
}

0 commit comments

Comments
 (0)