Skip to content

Commit 43ece38

Browse files
authored
Merge pull request mrdear#2 from nl101531/gRPC-dev
增加拦截器相关代码
2 parents 97526ca + 5e2394f commit 43ece38

13 files changed

+342
-5
lines changed

README.MD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ SpringDataJPA+QueryDSL使用,需要配合博客博文http://www.jianshu.com/p/e3
4141

4242
9.gRPC-Demo
4343
--------------
44-
参考官方案例的gRPC的Demo,里面有Hello World和 RouteGuide案例,适合入门.相关博文还在整理,适时加入
44+
参考官方案例的gRPC的Demo,里面有Hello World和 RouteGuide案例,适合入门.相关博文参考 http://www.jianshu.com/nb/9491747
4545

4646
10.待添加
4747
--------------
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package cn.mrdear.grpc.clientProcess;
2+
3+
import io.grpc.CallOptions;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientCall;
6+
import io.grpc.ClientInterceptor;
7+
import io.grpc.ForwardingClientCall;
8+
import io.grpc.ForwardingClientCallListener;
9+
import io.grpc.Metadata;
10+
import io.grpc.MethodDescriptor;
11+
import io.grpc.Status;
12+
13+
/**
14+
* 客户端拦截器
15+
* @author Niu Li
16+
* @date 2017/2/4
17+
*/
18+
//ClientInterceptor接口是针对ClientCall的创建进行拦截
19+
public class ClientInterruptImpl implements ClientInterceptor {
20+
@Override
21+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
22+
CallOptions callOptions, Channel next) {
23+
//创建client
24+
System.out.println("创建client1");
25+
ClientCall<ReqT,RespT> clientCall = next.newCall(method,callOptions);
26+
return new ForwardingClientCall<ReqT, RespT>() {
27+
@Override
28+
protected ClientCall<ReqT, RespT> delegate() {
29+
return clientCall;
30+
}
31+
@Override
32+
public void start(Listener<RespT> responseListener, Metadata headers) {
33+
System.out.println("拦截器1,在此可以对header参数进行修改");
34+
Metadata.Key<String> token = Metadata.Key.of("token",Metadata.ASCII_STRING_MARSHALLER);
35+
headers.put(token,"123456");
36+
//包装回调监听,使其也能拦截
37+
Listener<RespT> forwardListener = new ForwardingClientCallListener.
38+
SimpleForwardingClientCallListener<RespT>(responseListener) {
39+
@Override
40+
public void onHeaders(Metadata headers) {
41+
Metadata.Key<String> token = Metadata.Key.of("token",Metadata.ASCII_STRING_MARSHALLER);
42+
if (!"123456".equals(headers.get(token))){
43+
System.out.println("返回参数无token,关闭该链接");
44+
super.onClose(Status.DATA_LOSS,headers);
45+
}
46+
super.onHeaders(headers);
47+
}
48+
};
49+
super.start(forwardListener, headers);
50+
}
51+
};
52+
}
53+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package cn.mrdear.grpc.clientProcess;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import io.grpc.ManagedChannel;
6+
import io.grpc.ManagedChannelBuilder;
7+
import io.grpc.StatusRuntimeException;
8+
import io.grpc.examples.helloworld.GreeterGrpc;
9+
import io.grpc.examples.helloworld.HelloReply;
10+
import io.grpc.examples.helloworld.HelloRequest;
11+
12+
/**
13+
* 按照hello world来分析整个客户端执行过程
14+
* @author Niu Li
15+
* @date 2017/1/28
16+
*/
17+
public class HelloWorldClientProcess {
18+
19+
private final ManagedChannel channel; //一个gRPC信道
20+
private final GreeterGrpc.GreeterBlockingStub blockingStub;//阻塞/同步 存根
21+
22+
//初始化信道和存根
23+
public HelloWorldClientProcess(String host, int port){
24+
25+
this(ManagedChannelBuilder.forAddress(host, port)
26+
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
27+
// needing certificates.
28+
.usePlaintext(true));
29+
}
30+
31+
//创建channel的时候添加上转换器
32+
private HelloWorldClientProcess(ManagedChannelBuilder<?> channelBuilder) {
33+
channel = channelBuilder.intercept(new ClientInterruptImpl()).build();
34+
//使用gzip压缩
35+
blockingStub = GreeterGrpc.newBlockingStub(channel).withCompression("gzip");
36+
}
37+
38+
public void shutdown() throws InterruptedException {
39+
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
40+
}
41+
42+
//客户端方法
43+
public void greet(String name){
44+
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
45+
HelloReply response;
46+
try {
47+
response = blockingStub.sayHello(request);
48+
} catch (StatusRuntimeException e) {
49+
System.out.println("RPC调用失败:"+e.getMessage());
50+
return;
51+
}
52+
System.out.println("服务器返回信息:"+response.getMessage());
53+
}
54+
55+
public static void main(String[] args) throws InterruptedException {
56+
HelloWorldClientProcess client = new HelloWorldClientProcess("127.0.0.1",50051);
57+
try {
58+
for(int i=0;i<5;i++){
59+
client.greet("world:"+i);
60+
}
61+
}finally {
62+
client.shutdown();
63+
}
64+
}
65+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package cn.mrdear.grpc.clientProcess;
2+
3+
import java.io.IOException;
4+
5+
import io.grpc.Server;
6+
import io.grpc.ServerBuilder;
7+
import io.grpc.ServerInterceptors;
8+
import io.grpc.examples.helloworld.GreeterGrpc;
9+
import io.grpc.examples.helloworld.HelloReply;
10+
import io.grpc.examples.helloworld.HelloRequest;
11+
import io.grpc.stub.StreamObserver;
12+
13+
/**
14+
* 按照hello world来分析整个服务端执行过程
15+
* @author Niu Li
16+
* @date 2017/1/28
17+
*/
18+
public class HelloWorldServerProcess {
19+
20+
private int port = 50051;
21+
private Server server;
22+
23+
/**
24+
* 启动服务
25+
* @throws IOException
26+
*/
27+
private void start() throws IOException {
28+
System.out.println("service start...");
29+
30+
server = ServerBuilder.forPort(port)
31+
.addService(ServerInterceptors.intercept(new GreeterImpl(),new ServerInterruptImpl()))
32+
.build()
33+
.start();
34+
35+
System.out.println("service started");
36+
//程序正常退出,系统调用 System.exit方法或虚拟机被关闭时执行该调用
37+
Runtime.getRuntime().addShutdownHook(new Thread() {
38+
39+
@Override
40+
public void run() {
41+
System.err.println("*** shutting down gRPC server since JVM is shutting down");
42+
HelloWorldServerProcess.this.stop();
43+
System.err.println("*** server shut down");
44+
}
45+
});
46+
}
47+
48+
private void stop() {
49+
if (server != null) {
50+
server.shutdown();
51+
}
52+
}
53+
54+
// block 一直到退出程序
55+
private void blockUntilShutdown() throws InterruptedException {
56+
if (server != null) {
57+
server.awaitTermination();
58+
}
59+
}
60+
61+
62+
public static void main(String[] args) throws IOException, InterruptedException {
63+
final HelloWorldServerProcess server = new HelloWorldServerProcess();
64+
server.start();
65+
server.blockUntilShutdown();
66+
}
67+
68+
69+
// 实现 定义一个实现服务接口的类
70+
private class GreeterImpl extends GreeterGrpc.GreeterImplBase {
71+
72+
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
73+
//获取参数
74+
System.out.println("收到的信息:"+req.getName());
75+
76+
//这里可以放置具体业务处理代码 start
77+
78+
//这里可以放置具体业务处理代码 end
79+
80+
//构造返回
81+
HelloReply reply = HelloReply.newBuilder().setMessage(("Hello: " + req.getName())).build();
82+
responseObserver.onNext(reply);
83+
responseObserver.onCompleted();
84+
}
85+
}
86+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package cn.mrdear.grpc.clientProcess;
2+
3+
import io.grpc.ForwardingServerCall;
4+
import io.grpc.Metadata;
5+
import io.grpc.ServerCall;
6+
import io.grpc.ServerCallHandler;
7+
import io.grpc.ServerInterceptor;
8+
import io.grpc.Status;
9+
import io.netty.util.internal.StringUtil;
10+
11+
/**
12+
* 服务端拦截器
13+
* @author Niu Li
14+
* @date 2017/2/4
15+
*/
16+
public class ServerInterruptImpl implements ServerInterceptor{
17+
@Override
18+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
19+
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
20+
System.out.println("执行server拦截器1,获取token");
21+
//获取客户端参数
22+
Metadata.Key<String> token = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
23+
String tokenStr = headers.get(token);
24+
if (StringUtil.isNullOrEmpty(tokenStr)){
25+
System.out.println("未收到客户端token,关闭此连接");
26+
call.close(Status.DATA_LOSS,headers);
27+
}
28+
//服务端写回参数
29+
ServerCall<ReqT, RespT> serverCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
30+
@Override
31+
public void sendHeaders(Metadata headers) {
32+
System.out.println("执行server拦截器2,写入token");
33+
headers.put(token,tokenStr);
34+
super.sendHeaders(headers);
35+
}
36+
};
37+
return next.startCall(serverCall,headers);
38+
}
39+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package cn.mrdear.grpc.clientProcess.forwardExample;
2+
3+
/**
4+
* 主调用接口
5+
* @author Niu Li
6+
* @date 2017/2/4
7+
*/
8+
public abstract class Client {
9+
public abstract void start(String say);
10+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package cn.mrdear.grpc.clientProcess.forwardExample;
2+
3+
/**
4+
* @author Niu Li
5+
* @date 2017/2/4
6+
*/
7+
public class ClientImp extends Client {
8+
@Override
9+
public void start(String say) {
10+
System.out.println(say);
11+
}
12+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cn.mrdear.grpc.clientProcess.forwardExample;
2+
3+
/**
4+
* 用于包装Client到另一个Client
5+
* @author Niu Li
6+
* @date 2017/2/4
7+
*/
8+
public abstract class ForwardingClient extends Client{
9+
//要包装的对象
10+
protected abstract Client delegate();
11+
12+
@Override
13+
public void start(String say) {
14+
delegate().start(say);
15+
}
16+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package cn.mrdear.grpc.clientProcess.forwardExample;
2+
3+
/**
4+
* 一个简单的包装示例,必须要传入要包装的对象
5+
* @author Niu Li
6+
* @date 2017/2/4
7+
*/
8+
public class ForwardingClientImpl extends ForwardingClient{
9+
10+
//被委托对象
11+
private final Client client;
12+
13+
public ForwardingClientImpl(Client client) {
14+
this.client = client;
15+
}
16+
17+
@Override
18+
protected Client delegate() {
19+
return client;
20+
}
21+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package cn.mrdear.grpc.clientProcess.forwardExample;
2+
3+
/**
4+
* 一个简单的委托模式.
5+
* @author Niu Li
6+
* @date 2017/2/4
7+
*/
8+
public class InterceptTest {
9+
public static void main(String[] args) {
10+
Client client = new ClientImp();//主要想执行的方法
11+
//构造第一个拦截器
12+
Client intercept1 = new ForwardingClientImpl(client){
13+
@Override
14+
public void start(String say) {
15+
System.out.println("拦截器1");
16+
super.start(say);
17+
}
18+
};
19+
//构造第二个拦截器
20+
Client intercept2 = new ForwardingClientImpl(intercept1){
21+
@Override
22+
public void start(String say) {
23+
System.out.println("拦截器2");
24+
super.start(say);
25+
}
26+
};
27+
//执行主方法
28+
intercept2.start("这是要执行的方法");
29+
}
30+
}

gRPC-Demo/src/main/java/cn/mrdear/grpc/hello/HelloWorldClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public HelloWorldClient(String host,int port){
2929
/** Construct client for accessing RouteGuide server using the existing channel. */
3030
private HelloWorldClient(ManagedChannelBuilder<?> channelBuilder) {
3131
channel = channelBuilder.build();
32-
blockingStub = GreeterGrpc.newBlockingStub(channel);
32+
//使用gzip压缩
33+
blockingStub = GreeterGrpc.newBlockingStub(channel).withCompression("gzip");
3334
}
3435

3536
public void shutdown() throws InterruptedException {

gRPC-Demo/src/main/java/cn/mrdear/grpc/hello/HelloWorldServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ private void start() throws IOException {
3030

3131
System.out.println("service start...");
3232

33+
//程序正常退出,系统调用 System.exit方法或虚拟机被关闭时执行该调用
3334
Runtime.getRuntime().addShutdownHook(new Thread() {
3435

3536
@Override

0 commit comments

Comments
 (0)