Skip to content

Commit 45c1860

Browse files
authored
Merge pull request aol#341 from aol/reactiveAsync
Support async NIO requests via reactive-streams
2 parents 6ce6304 + 456a7c4 commit 45c1860

File tree

8 files changed

+253
-0
lines changed

8 files changed

+253
-0
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package app.publisher.binder.direct;
2+
3+
import com.aol.micro.server.MicroserverApp;
4+
import com.aol.micro.server.config.Microserver;
5+
import com.aol.micro.server.module.ConfigurableModule;
6+
import com.aol.micro.server.rest.jersey.AsyncBinder;
7+
import com.aol.micro.server.testing.RestAgent;
8+
import org.glassfish.jersey.server.ResourceConfig;
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import java.util.concurrent.ExecutionException;
14+
15+
import static org.hamcrest.CoreMatchers.is;
16+
import static org.junit.Assert.assertThat;
17+
import static org.junit.Assert.assertTrue;
18+
19+
@Microserver
20+
public class AsyncPublisherTest {
21+
RestAgent rest = new RestAgent();
22+
MicroserverApp server;
23+
24+
@Before
25+
public void startServer() {
26+
27+
server = new MicroserverApp(() -> "binder");
28+
server.start();
29+
30+
}
31+
32+
@After
33+
public void stopServer() {
34+
server.stop();
35+
}
36+
37+
@Test
38+
public void runAppAndBasicTest() throws InterruptedException, ExecutionException {
39+
assertThat(rest.get("http://localhost:8080/binder/test/myEndPoint"), is("hello world!"));
40+
}
41+
@Test
42+
public void runAppAndBasicTest2() throws InterruptedException, ExecutionException {
43+
assertThat(rest.get("http://localhost:8080/binder/test/async2"), is("hello"));
44+
}
45+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package app.publisher.binder.direct;
2+
3+
import com.aol.micro.server.auto.discovery.Rest;
4+
import cyclops.async.Future;
5+
import cyclops.stream.ReactiveSeq;
6+
import cyclops.stream.Spouts;
7+
8+
import javax.ws.rs.GET;
9+
import javax.ws.rs.Path;
10+
import java.util.concurrent.Executors;
11+
import java.util.stream.Stream;
12+
13+
14+
@Rest
15+
@Path("/test")
16+
public class AsyncResource {
17+
18+
private void sleep() {
19+
try {
20+
Thread.sleep(10000);
21+
} catch (InterruptedException e) {
22+
e.printStackTrace();
23+
}
24+
}
25+
26+
@GET
27+
@Path("myEndPoint")
28+
public Future<String> myEndPoint() {
29+
return Future.ofSupplier(() -> {
30+
sleep();
31+
return "hello world!";
32+
}, Executors.newFixedThreadPool(1));
33+
}
34+
35+
@GET
36+
@Path("async2")
37+
public ReactiveSeq<String> async2() {
38+
return Spouts.publishOn(Stream.of("hello"), Executors.newFixedThreadPool(1));
39+
}
40+
41+
42+
}

micro-jersey/readme.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,32 @@ Maven
2323
Gradle
2424
```groovy
2525
compile 'com.aol.microservices:micro-jersey:x.yz'
26+
```
27+
28+
## Baked in async NIO based REST
29+
30+
Return any reactive-streams Publisher from your REST end point to make them execute asynchronously automatically.
31+
32+
E.g. Using Future from [cyclops-react](cyclops-react.io)
33+
```java
34+
@GET
35+
public Future<String> myEndPoint(){
36+
return Future.ofSupplier(()->{
37+
sleep();
38+
return "hello world!";
39+
}, Executors.newFixedThreadPool(1));
40+
}
41+
```
42+
43+
Would be equivalent to the following code
44+
45+
```java
46+
@GET
47+
public void myEndPoint(@Suspended AsyncResponse asyncResponse){
48+
Future.ofSupplier(()->{
49+
sleep();
50+
asyncResponse.resume("hello world!");
51+
return 1;
52+
}, Executors.newFixedThreadPool(1));
53+
}
2654
```
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.aol.micro.server.rest.jersey;
2+
3+
import org.glassfish.hk2.utilities.binding.AbstractBinder;
4+
import org.glassfish.jersey.server.spi.internal.ResourceMethodDispatcher;
5+
import org.glassfish.jersey.server.spi.internal.ResourceMethodInvocationHandlerProvider;
6+
7+
import javax.inject.Singleton;
8+
9+
10+
public class AsyncBinder extends AbstractBinder {
11+
12+
@Override
13+
protected void configure() {
14+
bind(AsyncDispatcher.AsyncDispatcherProvider.class).to(
15+
ResourceMethodDispatcher.Provider.class).in(Singleton.class)
16+
.ranked(1);
17+
}
18+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.aol.micro.server.rest.jersey;
2+
3+
import cyclops.stream.ReactiveSeq;
4+
import cyclops.stream.Spouts;
5+
import lombok.AllArgsConstructor;
6+
import lombok.NoArgsConstructor;
7+
import org.glassfish.hk2.api.ServiceHandle;
8+
import org.glassfish.hk2.api.ServiceLocator;
9+
import org.glassfish.jersey.server.ContainerRequest;
10+
import org.glassfish.jersey.server.internal.LocalizationMessages;
11+
import org.glassfish.jersey.server.internal.inject.ConfiguredValidator;
12+
import org.glassfish.jersey.server.internal.process.AsyncContext;
13+
import javax.inject.Provider;
14+
15+
import org.glassfish.jersey.server.model.Invocable;
16+
import org.glassfish.jersey.server.spi.internal.ResourceMethodDispatcher;
17+
import org.reactivestreams.Publisher;
18+
19+
20+
import javax.ws.rs.ProcessingException;
21+
import javax.ws.rs.container.ContainerRequestContext;
22+
import javax.ws.rs.core.Context;
23+
import javax.ws.rs.core.Response;
24+
import java.lang.reflect.InvocationHandler;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
28+
public class AsyncDispatcher implements ResourceMethodDispatcher {
29+
30+
private final ResourceMethodDispatcher originalDispatcher;
31+
32+
@Context
33+
private javax.inject.Provider<AsyncContext> asyncContext;
34+
@Context
35+
private javax.inject.Provider<ContainerRequestContext> containerRequestContext;
36+
37+
38+
public AsyncDispatcher(ResourceMethodDispatcher originalDispatcher) {
39+
this.originalDispatcher = originalDispatcher;
40+
}
41+
42+
@AllArgsConstructor
43+
@NoArgsConstructor
44+
static class AsyncDispatcherProvider implements Provider{
45+
@Context
46+
private ServiceLocator serviceLocator;
47+
@Override
48+
public ResourceMethodDispatcher create(Invocable method, InvocationHandler handler, ConfiguredValidator validator) {
49+
final Class<?> returnType = method.getHandlingMethod().getReturnType();
50+
if(Publisher.class.isAssignableFrom(returnType)){
51+
Set<Provider> providers = serviceLocator.getAllServiceHandles(ResourceMethodDispatcher.Provider.class)
52+
.stream()
53+
.filter(h->!h.getActiveDescriptor()
54+
.getImplementationClass()
55+
.equals(AsyncDispatcherProvider.class))
56+
.map(ServiceHandle::getService)
57+
.collect(Collectors.toSet());
58+
59+
for (ResourceMethodDispatcher.Provider provider : providers) {
60+
ResourceMethodDispatcher dispatcher = provider.create(method, handler, validator);
61+
if (dispatcher != null) {
62+
AsyncDispatcher result = new AsyncDispatcher(dispatcher);
63+
serviceLocator.inject(result);
64+
return result;
65+
}
66+
}
67+
68+
}
69+
return null;
70+
}
71+
}
72+
@Override
73+
public Response dispatch(Object resource, ContainerRequest request) throws ProcessingException {
74+
final AsyncContext context = this.asyncContext.get();
75+
if(!context.suspend())
76+
throw new ProcessingException(LocalizationMessages.ERROR_SUSPENDING_ASYNC_REQUEST());
77+
final ContainerRequestContext requestContext = containerRequestContext.get();
78+
79+
Publisher pub = (Publisher)originalDispatcher.dispatch(resource, request)
80+
.getEntity();
81+
Spouts.from(pub).onEmptySwitch(()->Spouts.of(Response.noContent().build()))
82+
.forEach(1,context::resume, context::resume);
83+
84+
return null;
85+
}
86+
87+
88+
}

micro-jersey/src/main/java/com/aol/micro/server/rest/jersey/JerseyRestApplication.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public JerseyRestApplication(List<Object> allResources,List<String> packages, Li
5151
register(next.getClass());
5252
}
5353
}
54+
register(new AsyncBinder());
5455

5556
if (serverProperties.isEmpty()) {
5657
property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true);

micro-reactive/readme.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
The micro-reactive plugin integrates [cyclops-react](https://github.com/aol/cyclops-react) and [Pivotal Reactor](http://projectreactor.io/) to provide a very rich integrated reactive programming environment on top of Spring.
66

7+
*NB* Microserver's Jersey plugin already makes Publisher a valid return type, converts them to asynchronously executing REST End points
8+
79
Why?
810

911
cyclops-react offers a range of functional datatypes and datastructures, many of which act as reactive-streams Publishers /subscribers. Pivotal Reactor offer advanced / specialized processing capabilities for reactive-streams Publishers and subscribers.

readme.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,35 @@ See the response *hello world!*
4646

4747
Add plugins by adding them to your build file - rerun the app to get new end points, Spring beans and more!
4848

49+
## Easy to use async NIO based REST
50+
51+
Return any reactive-streams Publisher from your REST end point to make them execute asynchronously automatically.
52+
53+
E.g. Using Future from [cyclops-react](cyclops-react.io)
54+
55+
```java
56+
@GET
57+
public Future<String> myEndPoint(){
58+
return Future.ofSupplier(()->{
59+
sleep();
60+
return "hello world!";
61+
}, Executors.newFixedThreadPool(1));
62+
}
63+
```
64+
65+
Would be equivalent to the following code
66+
67+
```java
68+
@GET
69+
public void myEndPoint(@Suspended AsyncResponse asyncResponse){
70+
Future.ofSupplier(()->{
71+
sleep();
72+
asyncResponse.resume("hello world!");
73+
return 1;
74+
}, Executors.newFixedThreadPool(1));
75+
}
76+
```
77+
4978
# Why Microserver?
5079

5180
Microserver is a plugin engine for building Spring and Spring Boot based microservices. Microserver supports pure microservice and micro-monolith development styles. The micro-monolith style involves packaging multiple services into a single deployment - offering developers the productivity of microservice development without the operational risk. This can help teams adopt a Microservices architecture on projects that are currently monoliths.

0 commit comments

Comments
 (0)