Skip to content

Commit 096e44a

Browse files
author
zhourenjian
committed
Fix bug that stopping a compound pipe session and starting another compound pipe session may result in failing to setup the second compound pipe session.
Reason: Pipe C with session A, A is closing while session B is connecting, C will close A and clean up B. B will not get connected. Solution: Pipe C wait for some seconds before clean up all session (including B). So B will get connected and mark pipe C as active without being closed.
1 parent 2f07967 commit 096e44a

File tree

9 files changed

+200
-36
lines changed

9 files changed

+200
-36
lines changed

sources/net.sf.j2s.ajax/ajaxcore/net/sf/j2s/ajax/HttpRequest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ protected static interface IXHRReceiving {
159159
private OutputStream activeOS;
160160
private InputStream activeIS;
161161

162+
private boolean isCometConnection = false;
163+
162164
/**
163165
* Return read state of XMLHttpRequest.
164166
* @return int ready state
@@ -400,7 +402,11 @@ private boolean checkAbort() {
400402
private void request() {
401403
try {
402404
connection = (HttpURLConnection) new URL(url).openConnection();
403-
connection.setReadTimeout(30000); // 30 seconds
405+
if (isCometConnection) {
406+
connection.setReadTimeout(0); // 0 infinite
407+
} else {
408+
connection.setReadTimeout(30000); // 30s
409+
}
404410
connection.setDoInput(true);
405411
connection.setRequestMethod(method);
406412
connection.setRequestProperty("User-Agent",
@@ -561,4 +567,13 @@ private void request() {
561567
}
562568
}
563569

570+
/**
571+
* Enabling Comet mode for HTTP request connection.
572+
* Comet connection is used on Java level to provide SimplePipe connection.
573+
* @param isCometConnection
574+
*/
575+
void setCometConnection(boolean isCometConnection) {
576+
this.isCometConnection = isCometConnection;
577+
}
578+
564579
}

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/CompoundPipeRunnable.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ protected CompoundPipeSession getSession(String session) {
3434
}
3535

3636
@Override
37-
public void pipeDestroy() {
37+
public boolean pipeDestroy() {
38+
if (!super.pipeDestroy()) return false;
39+
3840
for (int i = 0; i < pipes.length; i++) {
3941
if (pipes[i] != null) {
4042
pipes[i].pipeDestroy();
41-
pipes[i] = null;
4243
}
4344
}
45+
return true;
4446
}
4547

4648
@Override
@@ -69,7 +71,12 @@ public boolean isPipeLive() {
6971
public void pipeClosed() {
7072
for (int i = 0; i < pipes.length; i++) {
7173
if (pipes[i] != null) {
72-
pipes[i].pipeClosed();
74+
if (pipes[i].closer != null) {
75+
pipes[i].closer.helpClosing(pipes[i]);
76+
} else {
77+
pipes[i].pipeClosed();
78+
}
79+
pipes[i] = null;
7380
}
7481
}
7582
}
@@ -79,6 +86,7 @@ public void pipeLost() {
7986
for (int i = 0; i < pipes.length; i++) {
8087
if (pipes[i] != null) {
8188
pipes[i].pipeLost();
89+
pipes[i] = null;
8290
}
8391
}
8492
}

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/CompoundPipeSession.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
public abstract class CompoundPipeSession extends SimplePipeRunnable {
44

5-
public static class PipeSessionClosedEvent extends CompoundSerializable {
5+
private static class PipeSessionClosedEvent extends CompoundSerializable {
66

77
}
88

@@ -21,6 +21,8 @@ public void ajaxRun() {
2121

2222
@Override
2323
public void pipeCreated() {
24+
super.pipeCreated();
25+
2426
SimplePipeRunnable pipe = SimplePipeHelper.getPipe(pipeKey);
2527
if (pipe instanceof CompoundPipeRunnable) {
2628
CompoundPipeRunnable cp = (CompoundPipeRunnable) pipe;
@@ -32,7 +34,9 @@ public void pipeCreated() {
3234
}
3335

3436
@Override
35-
public void pipeDestroy() {
37+
public boolean pipeDestroy() {
38+
if (!super.pipeDestroy()) return false;
39+
3640
PipeSessionClosedEvent evt = new PipeSessionClosedEvent();
3741
evt.session = session;
3842
pipeThrough(evt);
@@ -42,6 +46,7 @@ public void pipeDestroy() {
4246
CompoundPipeRunnable cp = (CompoundPipeRunnable) pipe;
4347
cp.unweave(this);
4448
}
49+
return true;
4550
}
4651

4752
@Override
@@ -97,6 +102,7 @@ public boolean deal(SimpleSerializable ss) {
97102

98103
public boolean deal(PipeSessionClosedEvent evt) {
99104
if (SimplePipeRequest.getRequstMode() == SimplePipeRequest.MODE_LOCAL_JAVA_THREAD) {
105+
this.pipeClosed();
100106
return true;
101107
}
102108
this.updateStatus(false);

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/SimplePipeHelper.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ static interface IPipeThrough {
2727
public void helpThrough(SimplePipeRunnable pipe, SimpleSerializable[] objs);
2828
}
2929

30+
static interface IPipeClosing {
31+
public void helpClosing(SimplePipeRunnable pipe);
32+
}
33+
3034
private static Map<String, Vector<SimpleSerializable>> pipeMap = null;
3135

3236
private SimplePipeHelper() {
@@ -219,5 +223,28 @@ static boolean notifyPipeStatus(String key, boolean live) {
219223
}
220224
return false;
221225
}
226+
227+
/**
228+
* Wait some more seconds to check whether the pipe is to be closed or not.
229+
* @param runnable
230+
* @return whether the pipe is to be closed or not.
231+
*/
232+
static boolean waitAMomentForClosing(final SimplePipeRunnable runnable) {
233+
long extra = runnable.pipeWaitClosingInterval();
234+
long interval = runnable.pipeMonitoringInterval();
235+
if (interval <= 0) {
236+
interval = 1000; // default as 1s
237+
}
238+
239+
while (!runnable.isPipeLive() && extra > 0) {
240+
try {
241+
Thread.sleep(interval);
242+
extra -= interval;
243+
} catch (InterruptedException e) {
244+
//e.printStackTrace();
245+
}
246+
}
247+
return !runnable.isPipeLive();
248+
}
222249

223250
}

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/SimplePipeHttpServlet.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,14 @@ protected void doPipe(final HttpServletResponse resp, String key, String type)
146146
long beforeLoop = new Date().getTime();
147147
Vector<SimpleSerializable> vector = null;
148148
while ((vector = SimplePipeHelper.getPipeVector(key)) != null
149-
&& SimplePipeHelper.isPipeLive(key) // check it!
149+
/* && SimplePipeHelper.isPipeLive(key) */ // check it!
150150
&& !writer.checkError()) {
151+
if (!SimplePipeHelper.isPipeLive(key)) {
152+
boolean okToClose = SimplePipeHelper.waitAMomentForClosing(SimplePipeHelper.getPipe(key));
153+
if (okToClose) {
154+
break;
155+
}
156+
}
151157
int size = vector.size();
152158
if (size > 0) {
153159
for (int i = size - 1; i >= 0; i--) {

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/SimplePipeRequest.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ public void run() {
204204
break;
205205
}
206206
} else {
207-
if (SimplePipeHelper.getPipe(runnable.pipeKey) == null) {
207+
SimplePipeRunnable pipeRunnable = SimplePipeHelper.getPipe(runnable.pipeKey);
208+
if (pipeRunnable == null || !pipeRunnable.isPipeLive()) {
208209
break;
209210
}
210211
}
@@ -215,8 +216,12 @@ public void run() {
215216
if (pipeLive) {
216217
runnable.keepPipeLive();
217218
} else {
218-
runnable.pipeClosed(); //?
219-
break;
219+
boolean okToClose = SimplePipeHelper.waitAMomentForClosing(runnable);
220+
if (okToClose) {
221+
runnable.pipeDestroy(); // Pipe's server side destroying
222+
runnable.pipeClosed(); // Pipe's client side closing
223+
break;
224+
}
220225
}
221226
} else {
222227
SimplePipeRunnable r = SimplePipeHelper.getPipe(runnable.pipeKey);
@@ -229,7 +234,7 @@ public void run() {
229234
String pipeRequestData = constructRequest(pipeKey, PIPE_TYPE_NOTIFY, false);
230235
sendRequest(request, pipeMethod, pipeURL, pipeRequestData, false);
231236
String response = request.getResponseText();
232-
if (response != null && response.indexOf(PIPE_STATUS_LOST) != -1) {
237+
if (response != null && response.indexOf("\"" + PIPE_STATUS_LOST + "\"") != -1) {
233238
runnable.pipeAlive = false;
234239
runnable.pipeLost();
235240
SimplePipeHelper.removePipe(pipeKey);
@@ -245,7 +250,7 @@ public void run() {
245250

246251
}, "Pipe Live Notifier Thread").start();
247252
}
248-
253+
249254
private static void pipeRequest(final SimplePipeRunnable runnable) {
250255
String url = runnable.getHttpURL();
251256
String method = runnable.getHttpMethod();
@@ -463,6 +468,7 @@ static void pipeNotify(SimplePipeRunnable runnable) { // notifier
463468
}
464469

465470
static void pipeNotifyCallBack(String key, String result) {
471+
//System.out.println(key + "::" + result);
466472
if (PIPE_STATUS_LOST.equals(result)) {
467473
SimplePipeRunnable pipe = SimplePipeHelper.getPipe(key);
468474
if (pipe != null) {
@@ -625,8 +631,18 @@ public boolean receiving(ByteArrayOutputStream baos, byte b[], int off, int len)
625631
public void onReceiving() {
626632
keepPipeLive(runnable);
627633
}
634+
635+
@Override
636+
public void onLoaded() { // on case that no destroy event is sent to client
637+
if (SimplePipeHelper.getPipe(runnable.pipeKey) != null) {
638+
runnable.pipeClosed();
639+
SimplePipeHelper.removePipe(runnable.pipeKey);
640+
}
641+
}
628642

629643
});
644+
pipeRequest.setCometConnection(true);
645+
630646
String pipeKey = runnable.pipeKey;
631647
String pipeMethod = runnable.getPipeMethod();
632648
String pipeURL = runnable.getPipeURL();
@@ -657,6 +673,7 @@ public void onReceiving() {
657673
* being parsed.
658674
*/
659675
public static String parseReceived(final String string) {
676+
//System.out.println(string);
660677
SimpleSerializable ss = null;
661678
int start = 0;
662679
while (string.length() > start + PIPE_KEY_LENGTH) { // should be bigger than 48 ( 32 + 6 + 1 + 8 + 1)

sources/net.sf.j2s.ajax/ajaxpipe/net/sf/j2s/ajax/SimplePipeRunnable.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public abstract class SimplePipeRunnable extends SimpleRPCRunnable {
3030

3131
SimplePipeHelper.IPipeThrough helper;
3232

33+
SimplePipeHelper.IPipeClosing closer;
34+
35+
private boolean destroyed;
36+
3337
/**
3438
*
3539
* @param helper
@@ -39,6 +43,15 @@ void setPipeHelper(SimplePipeHelper.IPipeThrough helper) {
3943
this.helper = helper;
4044
}
4145

46+
/**
47+
*
48+
* @param closer
49+
* @j2sIgnore
50+
*/
51+
void setPipeCloser(SimplePipeHelper.IPipeClosing closer) {
52+
this.closer = closer;
53+
}
54+
4255
public String getPipeURL() {
4356
return "simplepipe"; // url is relative to the servlet!
4457
}
@@ -86,7 +99,13 @@ public void ajaxOut() {
8699
* Destroy the pipe and remove listeners.
87100
* After pipe is destroyed, {@link #isPipeLive()} must be false
88101
*/
89-
public abstract void pipeDestroy();
102+
public boolean pipeDestroy() {
103+
if (destroyed) {
104+
return false; // already destroyed, no further destroy actions
105+
}
106+
destroyed = true;
107+
return true;
108+
}
90109

91110
/**
92111
* To initialize pipe with given parameters.
@@ -99,8 +118,8 @@ public void pipeInit() {
99118
* Success to create a pipe.
100119
*/
101120
public void pipeCreated() {
102-
// to be override
103121
// notify pipe is created
122+
destroyed = false;
104123
}
105124

106125
/**
@@ -140,40 +159,54 @@ public boolean isPipeLive() {
140159

141160
/**
142161
* Notify that the pipe is still alive.
162+
*
163+
* This method is run on server side
143164
*/
144165
public void keepPipeLive() {
145166
// to be override
146167
}
147168

148169
/**
149170
* Start pipe monitor to monitor the pipe status. If pipe is non-active,
150-
* try to destroy pipe by calling {@link #pipeDestroy()}.
171+
* try to destroy pipe by calling {@link #pipeDestroy()} and then close
172+
* pipe by calling {@link #pipeClosed()}.
173+
*
151174
* User may override this method to use its own monitoring method.
175+
*
176+
* This method is run on server side
152177
*/
153178
protected void pipeMonitoring() {
154179
new Thread(new Runnable() {
155180

156181
public void run() {
182+
long interval = pipeMonitoringInterval();
183+
if (interval <= 0) {
184+
interval = 1000;
185+
}
157186
while (true) {
158187
try {
159-
long interval = pipeMonitoringInterval();
160-
if (interval <= 0) {
161-
interval = 1000;
162-
}
163188
Thread.sleep(interval);
164189
} catch (InterruptedException e) {
165190
//e.printStackTrace();
166191
}
167192
if (!isPipeLive()) {
168-
pipeDestroy();
169-
break;
193+
boolean okToClose = SimplePipeHelper.waitAMomentForClosing(SimplePipeRunnable.this);
194+
if (okToClose) {
195+
pipeDestroy();
196+
if (closer != null) {
197+
closer.helpClosing(SimplePipeRunnable.this);
198+
} else {
199+
pipeClosed();
200+
}
201+
break;
202+
}
170203
}
171204
}
172205
}
173206

174207
}, "Pipe Monitor").start();
175208
}
176-
209+
177210
/**
178211
* Return interval time between two pipe status checking by monitor.
179212
* If return interval is less than or equals to 0, the interval time will
@@ -183,6 +216,15 @@ public void run() {
183216
protected long pipeMonitoringInterval() {
184217
return 1000;
185218
}
219+
220+
/**
221+
* Return interval time before a pipe is closed.
222+
* For compound pipe, two pipe session may have some milliseconds interval.
223+
* @return time interval in millisecond.
224+
*/
225+
protected long pipeWaitClosingInterval() {
226+
return 5000;
227+
}
186228

187229
/**
188230
* Update pipe's live status.

0 commit comments

Comments
 (0)