Skip to content

Commit 099f5a2

Browse files
Stephane Maldinirstoyanchev
Stephane Maldini
authored andcommitted
Upgrade to reactor-netty/-ipc to 0.6 snapshots
1 parent 57130b2 commit 099f5a2

File tree

9 files changed

+82
-75
lines changed

9 files changed

+82
-75
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ configure(allprojects) { project ->
7878
ext.reactivestreamsVersion = "1.0.0"
7979
ext.reactorVersion = "2.0.8.RELEASE"
8080
ext.reactorCoreVersion = '3.0.3.RELEASE'
81-
ext.reactorNettyVersion = '0.5.2.RELEASE'
81+
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
8282
ext.romeVersion = "1.7.0"
8383
ext.rxjavaVersion = '1.2.2'
8484
ext.rxjavaAdapterVersion = '1.2.1'
@@ -171,6 +171,7 @@ configure(allprojects) { project ->
171171
repositories {
172172
maven { url "https://repo.spring.io/libs-release" }
173173
maven { url "https://repo.spring.io/milestone" }
174+
maven { url "https://repo.spring.io/snapshot" }
174175
}
175176

176177
dependencies {

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#Mon Aug 15 21:03:22 CEST 2016
1+
#Fri Nov 04 16:30:57 GMT 2016
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.net.URI;
20+
import java.util.function.Consumer;
2021
import java.util.function.Function;
2122

2223
import reactor.core.publisher.Mono;
23-
import reactor.ipc.netty.config.ClientOptions;
24-
import reactor.ipc.netty.http.HttpClient;
25-
import reactor.ipc.netty.http.HttpException;
26-
import reactor.ipc.netty.http.HttpInbound;
24+
import reactor.ipc.netty.http.client.HttpClientOptions;
25+
import reactor.ipc.netty.options.ClientOptions;
26+
import reactor.ipc.netty.http.client.HttpClient;
27+
import reactor.ipc.netty.http.client.HttpClientException;
2728

2829
import org.springframework.http.HttpMethod;
2930

@@ -44,13 +45,13 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
4445
* and SSL support enabled.
4546
*/
4647
public ReactorClientHttpConnector() {
47-
this(ClientOptions.create().sslSupport());
48+
this.httpClient = HttpClient.create();
4849
}
4950

5051
/**
5152
* Create a Reactor Netty {@link ClientHttpConnector} with the given {@link ClientOptions}
5253
*/
53-
public ReactorClientHttpConnector(ClientOptions clientOptions) {
54+
public ReactorClientHttpConnector(Consumer<? super HttpClientOptions> clientOptions) {
5455
this.httpClient = HttpClient.create(clientOptions);
5556
}
5657

@@ -64,8 +65,7 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
6465
uri.toString(),
6566
httpClientRequest -> requestCallback
6667
.apply(new ReactorClientHttpRequest(method, uri, httpClientRequest)))
67-
.cast(HttpInbound.class)
68-
.otherwise(HttpException.class, exc -> Mono.just(exc.getChannel()))
68+
.otherwise(HttpClientException.class, exc -> Mono.just(exc.getResponse()))
6969
.map(ReactorClientHttpResponse::new);
7070
}
7171

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.reactivestreams.Publisher;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
27-
import reactor.ipc.netty.http.HttpClientRequest;
27+
import reactor.ipc.netty.http.client.HttpClientRequest;
2828

2929
import org.springframework.core.io.buffer.DataBuffer;
3030
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -36,7 +36,7 @@
3636
*
3737
* @author Brian Clozel
3838
* @since 5.0
39-
* @see reactor.ipc.netty.http.HttpClient
39+
* @see reactor.ipc.netty.http.client.HttpClient
4040
*/
4141
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
4242

@@ -54,7 +54,7 @@ public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
5454
this.httpMethod = httpMethod;
5555
this.uri = uri;
5656
this.httpRequest = httpRequest;
57-
this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc());
57+
this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc());
5858
}
5959

6060

@@ -84,7 +84,7 @@ public Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body) {
8484
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
8585
map(ReactorClientHttpRequest::toByteBufs);
8686
return applyBeforeCommit().then(this.httpRequest
87-
.sendAndFlush(byteBufs));
87+
.sendGroups(byteBufs));
8888
}
8989

9090
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {
@@ -100,7 +100,7 @@ public Mono<Void> setComplete() {
100100
@Override
101101
protected void writeHeaders() {
102102
getHeaders().entrySet()
103-
.forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue()));
103+
.forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue()));
104104
}
105105

106106
@Override

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.Collection;
2020

2121
import reactor.core.publisher.Flux;
22-
import reactor.ipc.netty.http.HttpInbound;
22+
import reactor.ipc.netty.http.client.HttpClientResponse;
2323

2424
import org.springframework.core.io.buffer.DataBuffer;
2525
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@@ -34,19 +34,19 @@
3434
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
3535
*
3636
* @author Brian Clozel
37-
* @see reactor.ipc.netty.http.HttpClient
37+
* @see reactor.ipc.netty.http.client.HttpClient
3838
* @since 5.0
3939
*/
4040
public class ReactorClientHttpResponse implements ClientHttpResponse {
4141

4242
private final NettyDataBufferFactory dataBufferFactory;
4343

44-
private final HttpInbound response;
44+
private final HttpClientResponse response;
4545

4646

47-
public ReactorClientHttpResponse(HttpInbound response) {
47+
public ReactorClientHttpResponse(HttpClientResponse response) {
4848
this.response = response;
49-
this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc());
49+
this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc());
5050
}
5151

5252

@@ -62,7 +62,9 @@ public Flux<DataBuffer> getBody() {
6262
@Override
6363
public HttpHeaders getHeaders() {
6464
HttpHeaders headers = new HttpHeaders();
65-
this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
65+
this.response.responseHeaders()
66+
.entries()
67+
.forEach(e -> headers.add(e.getKey(), e.getValue()));
6668
return headers;
6769
}
6870

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
package org.springframework.http.server.reactive;
1818

1919
import java.util.Map;
20-
import java.util.function.Function;
20+
import java.util.function.BiFunction;
2121

2222
import io.netty.handler.codec.http.HttpResponseStatus;
2323
import reactor.core.publisher.Mono;
24-
import reactor.ipc.netty.http.HttpChannel;
24+
import reactor.ipc.netty.http.server.HttpServerRequest;
25+
import reactor.ipc.netty.http.server.HttpServerResponse;
2526

2627
import org.springframework.core.io.buffer.NettyDataBufferFactory;
2728

@@ -32,7 +33,7 @@
3233
* @since 5.0
3334
*/
3435
public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
35-
implements Function<HttpChannel, Mono<Void>> {
36+
implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
3637

3738

3839
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
@@ -45,16 +46,18 @@ public ReactorHttpHandlerAdapter(Map<String, HttpHandler> handlerMap) {
4546

4647

4748
@Override
48-
public Mono<Void> apply(HttpChannel channel) {
49+
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {
4950

50-
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.delegate().alloc());
51-
ReactorServerHttpRequest request = new ReactorServerHttpRequest(channel, bufferFactory);
52-
ReactorServerHttpResponse response = new ReactorServerHttpResponse(channel, bufferFactory);
51+
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel()
52+
.alloc());
53+
ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory);
54+
ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response,
55+
bufferFactory);
5356

54-
return getHttpHandler().handle(request, response)
57+
return getHttpHandler().handle(req, resp)
5558
.otherwise(ex -> {
5659
logger.error("Could not complete request", ex);
57-
channel.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
60+
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
5861
return Mono.empty();
5962
})
6063
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import io.netty.handler.codec.http.cookie.Cookie;
2424
import reactor.core.publisher.Flux;
25-
import reactor.ipc.netty.http.HttpChannel;
25+
import reactor.ipc.netty.http.server.HttpServerRequest;
2626

2727
import org.springframework.core.io.buffer.DataBuffer;
2828
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@@ -34,27 +34,27 @@
3434
import org.springframework.util.MultiValueMap;
3535

3636
/**
37-
* Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}.
37+
* Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}.
3838
*
3939
* @author Stephane Maldini
4040
* @author Rossen Stoyanchev
4141
* @since 5.0
4242
*/
4343
public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
4444

45-
private final HttpChannel channel;
45+
private final HttpServerRequest request;
4646

4747
private final NettyDataBufferFactory bufferFactory;
4848

4949

50-
public ReactorServerHttpRequest(HttpChannel channel, NettyDataBufferFactory bufferFactory) {
51-
super(initUri(channel), initHeaders(channel));
50+
public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory) {
51+
super(initUri(request), initHeaders(request));
5252
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
53-
this.channel = channel;
53+
this.request = request;
5454
this.bufferFactory = bufferFactory;
5555
}
5656

57-
private static URI initUri(HttpChannel channel) {
57+
private static URI initUri(HttpServerRequest channel) {
5858
Assert.notNull("'channel' must not be null");
5959
try {
6060
URI uri = new URI(channel.uri());
@@ -73,29 +73,29 @@ private static URI initUri(HttpChannel channel) {
7373
}
7474
}
7575

76-
private static HttpHeaders initHeaders(HttpChannel channel) {
76+
private static HttpHeaders initHeaders(HttpServerRequest channel) {
7777
HttpHeaders headers = new HttpHeaders();
78-
for (String name : channel.headers().names()) {
79-
headers.put(name, channel.headers().getAll(name));
78+
for (String name : channel.requestHeaders().names()) {
79+
headers.put(name, channel.requestHeaders().getAll(name));
8080
}
8181
return headers;
8282
}
8383

8484

85-
public HttpChannel getReactorChannel() {
86-
return this.channel;
85+
public HttpServerRequest getReactorRequest() {
86+
return this.request;
8787
}
8888

8989
@Override
9090
public HttpMethod getMethod() {
91-
return HttpMethod.valueOf(this.channel.method().name());
91+
return HttpMethod.valueOf(this.request.method().name());
9292
}
9393

9494
@Override
9595
protected MultiValueMap<String, HttpCookie> initCookies() {
9696
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
97-
for (CharSequence name : this.channel.cookies().keySet()) {
98-
for (Cookie cookie : this.channel.cookies().get(name)) {
97+
for (CharSequence name : this.request.cookies().keySet()) {
98+
for (Cookie cookie : this.request.cookies().get(name)) {
9999
HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value());
100100
cookies.add(name.toString(), httpCookie);
101101
}
@@ -105,7 +105,7 @@ protected MultiValueMap<String, HttpCookie> initCookies() {
105105

106106
@Override
107107
public Flux<DataBuffer> getBody() {
108-
return this.channel.receive().retain().map(this.bufferFactory::wrap);
108+
return this.request.receive().retain().map(this.bufferFactory::wrap);
109109
}
110110

111111
}

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.reactivestreams.Publisher;
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
28-
import reactor.ipc.netty.http.HttpChannel;
28+
import reactor.ipc.netty.http.server.HttpServerResponse;
2929

3030
import org.springframework.core.io.buffer.DataBuffer;
3131
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -37,7 +37,7 @@
3737
import org.springframework.util.Assert;
3838

3939
/**
40-
* Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}.
40+
* Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}.
4141
*
4242
* @author Stephane Maldini
4343
* @author Rossen Stoyanchev
@@ -46,51 +46,51 @@
4646
public class ReactorServerHttpResponse extends AbstractServerHttpResponse
4747
implements ZeroCopyHttpOutputMessage {
4848

49-
private final HttpChannel channel;
49+
private final HttpServerResponse response;
5050

5151

52-
public ReactorServerHttpResponse(HttpChannel response, DataBufferFactory bufferFactory) {
52+
public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
5353
super(bufferFactory);
5454
Assert.notNull("'response' must not be null.");
55-
this.channel = response;
55+
this.response = response;
5656
}
5757

5858

59-
public HttpChannel getReactorChannel() {
60-
return this.channel;
59+
public HttpServerResponse getReactorResponse() {
60+
return this.response;
6161
}
6262

6363

6464
@Override
6565
protected void applyStatusCode() {
6666
HttpStatus statusCode = this.getStatusCode();
6767
if (statusCode != null) {
68-
getReactorChannel().status(HttpResponseStatus.valueOf(statusCode.value()));
68+
getReactorResponse().status(HttpResponseStatus.valueOf(statusCode.value()));
6969
}
7070
}
7171

7272
@Override
7373
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
7474
Publisher<ByteBuf> body = toByteBufs(publisher);
75-
return this.channel.send(body);
75+
return this.response.send(body);
7676
}
7777

7878
@Override
7979
protected Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> publisher) {
8080
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
8181
.map(ReactorServerHttpResponse::toByteBufs);
82-
return this.channel.sendAndFlush(body);
82+
return this.response.sendGroups(body);
8383
}
8484

8585
@Override
8686
protected void applyHeaders() {
8787
// TODO: temporarily, see https://github.com/reactor/reactor-netty/issues/2
8888
if(getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)){
89-
this.channel.responseTransfer(false);
89+
this.response.disableChunkedTransfer();
9090
}
9191
for (String name : getHeaders().keySet()) {
9292
for (String value : getHeaders().get(name)) {
93-
this.channel.responseHeaders().add(name, value);
93+
this.response.responseHeaders().add(name, value);
9494
}
9595
}
9696
}
@@ -107,14 +107,14 @@ protected void applyCookies() {
107107
httpCookie.getPath().ifPresent(cookie::setPath);
108108
cookie.setSecure(httpCookie.isSecure());
109109
cookie.setHttpOnly(httpCookie.isHttpOnly());
110-
this.channel.addResponseCookie(cookie);
110+
this.response.addCookie(cookie);
111111
}
112112
}
113113
}
114114

115115
@Override
116116
public Mono<Void> writeWith(File file, long position, long count) {
117-
return doCommit(() -> this.channel.sendFile(file, position, count));
117+
return doCommit(() -> this.response.sendFile(file, position, count));
118118
}
119119

120120
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {

0 commit comments

Comments
 (0)