Skip to content

Commit ea97d12

Browse files
committed
Work around for eclipse-vertx/vert.x#2288
1 parent dfd231c commit ea97d12

File tree

9 files changed

+156
-48
lines changed

9 files changed

+156
-48
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,17 @@
6464
<dependency>
6565
<groupId>io.vertx</groupId>
6666
<artifactId>vertx-core</artifactId>
67-
<version>3.5.0</version>
67+
<version>3.5.1</version>
6868
</dependency>
6969
<dependency>
7070
<groupId>io.vertx</groupId>
7171
<artifactId>vertx-web</artifactId>
72-
<version>3.5.0</version>
72+
<version>3.5.1</version>
7373
</dependency>
7474
<dependency>
7575
<groupId>io.vertx</groupId>
7676
<artifactId>vertx-unit</artifactId>
77-
<version>3.5.0</version>
77+
<version>3.5.1</version>
7878
<scope>test</scope>
7979
</dependency>
8080
<dependency>

sfs-server/src/main/java/org/sfs/SfsServer.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import com.fasterxml.jackson.core.JsonFactory;
2020
import com.google.common.base.Preconditions;
21+
import com.google.common.net.HostAndPort;
2122
import io.vertx.core.Context;
2223
import io.vertx.core.DeploymentOptions;
2324
import io.vertx.core.Future;
2425
import io.vertx.core.Vertx;
2526
import io.vertx.core.http.HttpClient;
27+
import io.vertx.core.http.HttpClientRequest;
28+
import io.vertx.core.http.HttpClientResponse;
2629
import io.vertx.core.logging.Logger;
2730
import io.vertx.core.logging.LoggerFactory;
2831
import io.vertx.core.shareddata.Lock;
@@ -38,16 +41,21 @@
3841
import org.sfs.nodes.NodeStats;
3942
import org.sfs.nodes.Nodes;
4043
import org.sfs.rx.Defer;
44+
import org.sfs.rx.HttpClientResponseBodyBuffer;
4145
import org.sfs.rx.ObservableFuture;
4246
import org.sfs.rx.RxHelper;
4347
import org.sfs.rx.ToVoid;
48+
import rx.Observable;
4449
import rx.Subscriber;
50+
import rx.functions.Func0;
4551
import rx.plugins.RxJavaHooks;
4652
import rx.plugins.RxJavaSchedulersHook;
4753

4854
import java.util.concurrent.ExecutorService;
4955
import java.util.concurrent.atomic.AtomicBoolean;
5056

57+
import static java.lang.String.format;
58+
5159
public class SfsServer extends Server {
5260

5361
private static final Logger LOGGER = LoggerFactory.getLogger(SfsServer.class);
@@ -97,6 +105,25 @@ public void start(Future<Void> startedResult) {
97105
vertxContext = new VertxContext<>(_this);
98106
return delegate.initHttpListeners(vertxContext).map(new ToVoid<>());
99107
})
108+
.flatMap(aVoid -> {
109+
// make httpclient bind to correct context
110+
HostAndPort firstPublishedAddress = delegate.nodes().getHostAndPort();
111+
String url =
112+
format("http://%s/admin/001/healthcheck", firstPublishedAddress);
113+
Func0<Observable<Void>> func0 = () -> {
114+
LOGGER.debug("Trying to connect to health check {}", url);
115+
ObservableFuture<HttpClientResponse> handler = RxHelper.observableFuture();
116+
HttpClientRequest httpClientRequest = httpClient.getAbs(url, httpClientResponse -> {
117+
httpClientResponse.pause();
118+
handler.complete(httpClientResponse);
119+
}).exceptionHandler(handler::fail)
120+
.setTimeout(5000);
121+
httpClientRequest.end();
122+
return handler.flatMap(new HttpClientResponseBodyBuffer())
123+
.map(new ToVoid<>());
124+
};
125+
return RxHelper.onErrorResumeNext(20, func0);
126+
})
100127
.count()
101128
.map(new ToVoid<>())
102129
.subscribe(new Subscriber<Void>() {

sfs-server/src/main/java/org/sfs/SfsSingletonServer.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.vertx.core.Vertx;
2828
import io.vertx.core.http.HttpClient;
2929
import io.vertx.core.http.HttpClientOptions;
30+
import io.vertx.core.http.HttpClientRequest;
31+
import io.vertx.core.http.HttpClientResponse;
3032
import io.vertx.core.http.HttpServer;
3133
import io.vertx.core.http.HttpServerOptions;
3234
import io.vertx.core.json.JsonObject;
@@ -93,13 +95,16 @@
9395
import org.sfs.nodes.master.MasterNodeStopJob;
9496
import org.sfs.nodes.master.MasterNodeWaitForJob;
9597
import org.sfs.rx.Defer;
98+
import org.sfs.rx.HttpClientResponseBodyBuffer;
9699
import org.sfs.rx.ObservableFuture;
97100
import org.sfs.rx.RxHelper;
98101
import org.sfs.rx.Terminus;
102+
import org.sfs.rx.ToVoid;
99103
import org.sfs.thread.NamedCapacityFixedThreadPool;
100104
import org.sfs.util.ConfigHelper;
101105
import org.sfs.util.FileSystemLock;
102106
import rx.Observable;
107+
import rx.functions.Func0;
103108

104109
import java.net.HttpURLConnection;
105110
import java.nio.file.Path;
@@ -111,6 +116,8 @@
111116
import java.util.concurrent.TimeUnit;
112117
import java.util.concurrent.atomic.AtomicBoolean;
113118

119+
import static java.lang.String.format;
120+
114121
public class SfsSingletonServer extends Server implements Shareable {
115122

116123
private static final AtomicBoolean STARTED = new AtomicBoolean(false);
@@ -275,6 +282,26 @@ public void start(final Future<Void> startedResult) {
275282
.flatMap(aVoid -> masterKeys.start(vertxContext))
276283
.flatMap(aVoid -> containerKeys.start(vertxContext))
277284
.flatMap(aVoid -> jobs.open(vertxContext, config))
285+
.flatMap(aVoid -> initHttpListeners(vertxContext))
286+
.flatMap(aVoid -> {
287+
// make httpclient bind to correct context
288+
HostAndPort firstPublishedAddress = nodes.getHostAndPort();
289+
String url =
290+
format("http://%s/admin/001/healthcheck", firstPublishedAddress);
291+
Func0<Observable<Void>> func0 = () -> {
292+
LOGGER.debug("Trying to connect to health check {}", url);
293+
ObservableFuture<HttpClientResponse> handler = RxHelper.observableFuture();
294+
HttpClientRequest httpClientRequest = httpClient.getAbs(url, httpClientResponse -> {
295+
httpClientResponse.pause();
296+
handler.complete(httpClientResponse);
297+
}).exceptionHandler(handler::fail)
298+
.setTimeout(5000);
299+
httpClientRequest.end();
300+
return handler.flatMap(new HttpClientResponseBodyBuffer())
301+
.map(new ToVoid<>());
302+
};
303+
return RxHelper.onErrorResumeNext(20, func0);
304+
})
278305
.subscribe(
279306
o -> {
280307
// do nothing
@@ -604,8 +631,6 @@ protected HttpClient createHttpClient(Vertx v, boolean https) {
604631

605632
HttpClient client = v.createHttpClient(httpClientOptions);
606633

607-
LOGGER.debug("Current Thread 1 is {}, http client is {}", Thread.currentThread(), client);
608-
609634
return client;
610635
}
611636

sfs-server/src/main/java/org/sfs/SfsVertxImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public SfsVertxImpl(Vertx vertx, ExecutorService backgroundPool, ExecutorService
6363
this.ioPool = ioPool;
6464
}
6565

66+
@Override
67+
public DnsClient createDnsClient() {
68+
return vertx.createDnsClient();
69+
}
70+
6671
@Override
6772
public DnsClient createDnsClient(DnsClientOptions dnsClientOptions) {
6873
return vertx.createDnsClient(dnsClientOptions);

sfs-server/src/main/java/org/sfs/nodes/Nodes.java

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.Collections;
4444
import java.util.List;
4545
import java.util.UUID;
46-
import java.util.concurrent.TimeUnit;
4746
import java.util.concurrent.atomic.AtomicReference;
4847

4948
import static com.google.common.base.Charsets.UTF_8;
@@ -259,56 +258,25 @@ public Observable<HttpClientRequestAndResponse> connectFirstAvailable(Vertx vert
259258
hostAndPorts,
260259
hostAndPort ->
261260
Defer.aVoid()
262-
.flatMap(aVoid -> supplier.call(hostAndPort))
263-
.onErrorResumeNext(throwable -> {
264-
errors.clear();
265-
LOGGER.debug("Retry delay 100ms");
266-
return Defer.aVoid()
267-
.delay(100, TimeUnit.MILLISECONDS)
268-
.flatMap(aVoid -> supplier.call(hostAndPort));
269-
})
270-
.onErrorResumeNext(throwable -> {
271-
errors.clear();
272-
LOGGER.debug("Retry delay 100ms");
273-
return Defer.aVoid()
274-
.delay(100, TimeUnit.MILLISECONDS)
275-
.flatMap(aVoid -> supplier.call(hostAndPort));
276-
})
277-
.onErrorResumeNext(throwable -> {
278-
errors.clear();
279-
LOGGER.debug("Retry delay 200ms");
280-
return Defer.aVoid()
281-
.delay(200, TimeUnit.MILLISECONDS)
282-
.flatMap(aVoid -> supplier.call(hostAndPort));
283-
})
284-
.onErrorResumeNext(throwable -> {
285-
errors.clear();
286-
LOGGER.debug("Retry delay 300ms");
287-
return Defer.aVoid()
288-
.delay(300, TimeUnit.MILLISECONDS)
289-
.flatMap(aVoid -> supplier.call(hostAndPort));
290-
})
291-
.onErrorResumeNext(throwable -> {
292-
errors.clear();
293-
LOGGER.debug("Retry delay 500ms");
294-
return Defer.aVoid()
295-
.delay(500, TimeUnit.MILLISECONDS)
296-
.flatMap(aVoid -> supplier.call(hostAndPort));
297-
})
261+
.flatMap(aVoid -> RxHelper.onErrorResumeNext(50, () -> supplier.call(hostAndPort)))
298262
.doOnNext(httpClientRequestAndResponseAction1 -> Preconditions.checkState(ref.compareAndSet(null, httpClientRequestAndResponseAction1), "Already set"))
299263
.map(httpClientRequestAndResponse -> false)
300264
.onErrorResumeNext(throwable -> {
301265
LOGGER.warn("Handling connect failure to " + hostAndPort, throwable);
302266
errors.add(throwable);
303267
return Defer.just(true);
304268
}))
305-
.map(_continue -> {
269+
.flatMap(_continue -> {
306270
HttpClientRequestAndResponse httpClientRequestAndResponse = ref.get();
307271
if (httpClientRequestAndResponse == null) {
308272
Preconditions.checkState(!errors.isEmpty(), "Errors cannot be empty");
309-
throw new CompositeException(errors);
273+
if (errors.size() == 1) {
274+
return Observable.error(errors.get(0));
275+
} else {
276+
return Observable.error(new CompositeException(errors));
277+
}
310278
} else {
311-
return httpClientRequestAndResponse;
279+
return Observable.just(httpClientRequestAndResponse);
312280
}
313281
});
314282
});

sfs-server/src/main/java/org/sfs/rx/RxHelper.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,24 @@
4141

4242
import java.util.List;
4343
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.TimeUnit;
4445

4546
import static java.util.Arrays.asList;
4647
import static rx.Observable.combineLatestDelayError;
4748
import static rx.functions.Functions.fromFunc;
4849

4950
public class RxHelper {
5051

52+
public static <T> Observable<T> onErrorResumeNext(int count, Func0<Observable<T>> func0) {
53+
Observable<T> base = func0.call();
54+
for (int i = 1; i <= count; i++) {
55+
base = base.onErrorResumeNext(throwable -> Defer.aVoid()
56+
.delay(100, TimeUnit.MILLISECONDS)
57+
.flatMap(aVoid -> func0.call()));
58+
}
59+
return base;
60+
}
61+
5162
@SuppressWarnings("unchecked")
5263
public static final <T1, T2, R> Observable<R> combineSinglesDelayError(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
5364
return combineSinglesDelayError(asList(o1.single(), o2.single()), fromFunc(combineFunction));

sfs-server/src/test/java/org/sfs/RunBootedTestOnContextRx.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.common.net.HostAndPort;
2121
import io.vertx.core.Context;
2222
import io.vertx.core.DeploymentOptions;
23-
import io.vertx.core.Handler;
2423
import io.vertx.core.Vertx;
2524
import io.vertx.core.VertxOptions;
2625
import io.vertx.core.buffer.Buffer;
@@ -33,6 +32,7 @@
3332
import org.junit.runner.Description;
3433
import org.junit.runners.model.Statement;
3534
import org.sfs.integration.java.func.ResetForTest;
35+
import org.sfs.integration.java.func.WaitForHealthCheck;
3636
import org.sfs.integration.java.help.AuthorizationFactory;
3737
import org.sfs.rx.ObservableFuture;
3838
import org.sfs.rx.RxHelper;
@@ -47,7 +47,6 @@
4747
import java.util.UUID;
4848
import java.util.concurrent.ConcurrentHashMap;
4949
import java.util.concurrent.ConcurrentMap;
50-
import java.util.concurrent.atomic.AtomicReference;
5150
import java.util.function.Supplier;
5251

5352
import static com.google.common.base.Preconditions.checkState;
@@ -172,6 +171,8 @@ public void evaluate() throws Throwable {
172171
vertxContext = sfsServer.vertxContext();
173172
checkState(vertxContext != null, "VertxContext was null on Verticle %s", sfsServer);
174173
})
174+
.flatMap(aVoid -> Observable.just((Void) null)
175+
.flatMap(new WaitForHealthCheck(getHttpClient(), vertx)))
175176
.flatMap(aVoid1 -> Observable.just((Void) null)
176177
.flatMap(new ResetForTest(getHttpClient(), authAdmin)))
177178
.map(new HttpClientResponseHeaderLogger())

sfs-server/src/test/java/org/sfs/integration/java/func/ResetForTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public ResetForTest(HttpClient httpClient, AuthorizationFactory.Producer auth) {
4444
public Observable<HttpClientResponse> call(Void aVoid) {
4545
return auth.toHttpAuthorization()
4646
.flatMap(s -> {
47-
LOGGER.debug("Current Thread is {}", Thread.currentThread());
4847
StringBuilder urlBuilder = new StringBuilder();
4948
urlBuilder = urlBuilder.append("/admin/001/resetfortest");
5049
ObservableFuture<HttpClientResponse> handler = RxHelper.observableFuture();
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2018 The Simple File Server Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.sfs.integration.java.func;
18+
19+
import io.vertx.core.Vertx;
20+
import io.vertx.core.http.HttpClient;
21+
import io.vertx.core.http.HttpClientRequest;
22+
import io.vertx.core.http.HttpClientResponse;
23+
import io.vertx.core.logging.Logger;
24+
import org.sfs.rx.HttpClientResponseBodyBuffer;
25+
import org.sfs.rx.ObservableFuture;
26+
import org.sfs.rx.RxHelper;
27+
import org.sfs.rx.ToVoid;
28+
import rx.Observable;
29+
import rx.exceptions.CompositeException;
30+
import rx.functions.Func0;
31+
import rx.functions.Func1;
32+
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
import static io.vertx.core.logging.LoggerFactory.getLogger;
37+
38+
public class WaitForHealthCheck implements Func1<Void, Observable<Void>> {
39+
40+
private static final Logger LOGGER = getLogger(WaitForHealthCheck.class);
41+
private HttpClient httpClient;
42+
private Vertx vertx;
43+
44+
public WaitForHealthCheck(HttpClient httpClient, Vertx vertx) {
45+
this.httpClient = httpClient;
46+
this.vertx = vertx;
47+
}
48+
49+
@Override
50+
public Observable<Void> call(Void aVoid) {
51+
Func0<Observable<Void>> func0 = () -> {
52+
StringBuilder urlBuilder = new StringBuilder();
53+
urlBuilder = urlBuilder.append("/admin/001/healthcheck");
54+
ObservableFuture<HttpClientResponse> httpHandler = RxHelper.observableFuture();
55+
HttpClientRequest httpClientRequest =
56+
httpClient.get(urlBuilder.toString(), httpHandler::complete)
57+
.exceptionHandler(httpHandler::fail)
58+
.setTimeout(5000);
59+
httpClientRequest.end();
60+
return httpHandler.flatMap(new HttpClientResponseBodyBuffer())
61+
.map(new ToVoid<>());
62+
};
63+
List<Throwable> errors = new ArrayList<>();
64+
return Observable.just((Void) null)
65+
.flatMap(aVoid1 -> RxHelper.onErrorResumeNext(20, func0))
66+
.onErrorResumeNext(throwable -> {
67+
LOGGER.warn("Handling connect failure to health check", throwable);
68+
errors.add(throwable);
69+
return Observable.error(new CompositeException(errors));
70+
});
71+
}
72+
}

0 commit comments

Comments
 (0)