Skip to content

Commit 133e33e

Browse files
author
孙健
committed
up
1 parent cd4f52b commit 133e33e

File tree

3 files changed

+32
-21
lines changed

3 files changed

+32
-21
lines changed

src/main/java/org/nlpcn/jcoder/server/rpc/client/RpcContext.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
public class RpcContext {
99

10-
public static final String Json = "Json";
10+
public static final int OBJ = 0, JSON = 1, FILE = 2;
1111

1212
private ChannelHandlerContext chContext;
1313

@@ -17,7 +17,7 @@ public class RpcContext {
1717

1818
private Map<Object, Object> map = null;
1919

20-
private String returnType ;
20+
private int type ;
2121

2222
public RpcContext(ChannelHandlerContext ctx) {
2323
this.chContext = ctx;
@@ -61,12 +61,14 @@ public void setRep(RpcResponse rep) {
6161
this.rep = rep;
6262
}
6363

64-
public String getReturnType() {
65-
return returnType;
64+
public int getType() {
65+
return type;
6666
}
6767

68-
public void setReturnType(String returnType) {
69-
this.returnType = returnType;
68+
public void setType(int type) {
69+
this.type = type;
7070
}
71+
72+
7173

7274
}

src/main/java/org/nlpcn/jcoder/server/rpc/client/RpcDecoder.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package org.nlpcn.jcoder.server.rpc.client;
22

33
import java.nio.charset.Charset;
4-
import java.nio.charset.CharsetDecoder;
54
import java.util.List;
65

76
import com.alibaba.fastjson.JSON;
@@ -34,22 +33,26 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB
3433
if (byteBuf.readableBytes() < dataLength) {
3534
byteBuf.resetReaderIndex();
3635
}
37-
byte[] data = new byte[dataLength];
36+
37+
int type = byteBuf.readByte();
38+
39+
byte[] data = new byte[dataLength - 1];
3840
byteBuf.readBytes(data);
3941

4042
Object obj = null;
4143

42-
if (data[0] == 'J' && data[1] == 's' && data[2] == 'o' && data[3] == 'n') {
44+
if (type == RpcContext.OBJ) {
45+
obj = SerializationUtil.deserialize(data, genericClass);
46+
Rpcs.getContext().setType(RpcContext.OBJ);
47+
} else if (type == RpcContext.JSON) {
4348
obj = JSONObject.toJavaObject((JSON) JSONObject.parse(data, 4, data.length - 4, CHARSET.newDecoder()), genericClass);
44-
Rpcs.getContext().setReturnType(RpcContext.Json);
45-
} else if (data[0] == 'F' && data[1] == 'i' && data[2] == 'l' && data[3] == 'e') {
49+
Rpcs.getContext().setType(RpcContext.JSON);
50+
} else if (type == RpcContext.OBJ) {
4651
RpcRequest rpcRequest = new RpcRequest();
4752
rpcRequest.setClassName(VFile.VFILE_LOCAL);
4853
rpcRequest.setMethodName(VFile.VFILE_LOCAL);
4954
rpcRequest.setArguments(new Object[] { data });
5055
obj = rpcRequest;
51-
} else {
52-
obj = SerializationUtil.deserialize(data, genericClass);
5356
}
5457

5558
list.add(obj);

src/main/java/org/nlpcn/jcoder/server/rpc/server/RpcServer.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.nio.channels.spi.SelectorProvider;
44
import java.util.concurrent.ThreadFactory;
55

6+
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
67
import org.nlpcn.jcoder.server.rpc.client.RpcDecoder;
78
import org.nlpcn.jcoder.server.rpc.client.RpcEncoder;
89
import org.nlpcn.jcoder.server.rpc.client.RpcRequest;
@@ -21,6 +22,9 @@
2122
import io.netty.channel.socket.nio.NioServerSocketChannel;
2223
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2324
import io.netty.handler.codec.LengthFieldPrepender;
25+
import io.netty.handler.codec.http.HttpObjectAggregator;
26+
import io.netty.handler.codec.http.HttpServerCodec;
27+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
2428

2529
/**
2630
* rpc server
@@ -29,8 +33,8 @@
2933
*
3034
*/
3135
public class RpcServer {
32-
33-
private static final Logger LOG = LoggerFactory.getLogger(RpcServer.class) ;
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
3438

3539
private static ServerBootstrap bootstrap = null;
3640

@@ -42,14 +46,16 @@ public class RpcServer {
4246

4347
private static final int MESSAGE_LENGTH = 4;
4448

49+
private static final String WEBSOCKET_PATH = "/";
50+
4551
/**
4652
*
4753
* @throws Exception
4854
*/
4955
public static void startServer(int port) throws Exception {
50-
56+
5157
LOG.info("to start rpc server ");
52-
58+
5359
ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
5460

5561
boss = new NioEventLoopGroup(PARALLEL);
@@ -61,8 +67,8 @@ public static void startServer(int port) throws Exception {
6167
@Override
6268
protected void initChannel(SocketChannel socketChannel) throws Exception {
6369
ChannelPipeline pipeline = socketChannel.pipeline();
64-
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MESSAGE_LENGTH, 0, MESSAGE_LENGTH));
65-
pipeline.addLast(new LengthFieldPrepender(MESSAGE_LENGTH));
70+
pipeline.addLast(new HttpServerCodec());
71+
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
6672
pipeline.addLast(new RpcDecoder(RpcRequest.class));
6773
pipeline.addLast(new RpcEncoder(RpcResponse.class));
6874
pipeline.addLast(new ContextHandler());
@@ -73,11 +79,11 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
7379
ChannelFuture future = bootstrap.bind(port).sync();
7480

7581
future.channel().closeFuture();
76-
82+
7783
LOG.info("start rpc server ok");
7884

7985
} finally {
80-
86+
8187
}
8288
}
8389

0 commit comments

Comments
 (0)