Skip to content

Commit aa8cf9b

Browse files
committed
添加访Rocket心跳机制
1 parent b1748fb commit aa8cf9b

File tree

8 files changed

+405
-0
lines changed

8 files changed

+405
-0
lines changed

Spring-Netty/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838
<version>${netty-all.version}</version>
3939
</dependency>
4040

41+
<dependency>
42+
<groupId>com.alibaba</groupId>
43+
<artifactId>fastjson</artifactId>
44+
<version>1.2.76</version>
45+
</dependency>
46+
4147
</dependencies>
4248

4349
<build>
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.*;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.channel.socket.nio.NioSocketChannel;
8+
import io.netty.util.concurrent.DefaultEventExecutorGroup;
9+
10+
import java.net.InetSocketAddress;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.ScheduledExecutorService;
13+
import java.util.concurrent.ThreadFactory;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
/**
18+
* @author lhy
19+
* @date 2021/8/19
20+
*/
21+
public class ClientTest {
22+
23+
public static final EventLoopGroup myEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() {
24+
25+
private AtomicInteger threadIndex = new AtomicInteger(0);
26+
27+
@Override
28+
public Thread newThread(Runnable r) {
29+
return new Thread(r, String.format("MyNettyClientSelector_%d", this.threadIndex.incrementAndGet()));
30+
}
31+
});
32+
33+
public static final DefaultEventExecutorGroup nettyHandlerExecutorGroup = new DefaultEventExecutorGroup(1,
34+
new ThreadFactory() {
35+
private AtomicInteger threadIndex = new AtomicInteger(0);
36+
@Override
37+
public Thread newThread(Runnable r) {
38+
return new Thread(r, "nettyHandlerThread_" + this.threadIndex.incrementAndGet());
39+
}
40+
});
41+
42+
public static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
43+
@Override
44+
public Thread newThread(Runnable r) {
45+
Thread thread = new Thread(r, "scheduledThread_");
46+
thread.setDaemon(false);
47+
return thread;
48+
}
49+
});
50+
51+
public static void main(String[] args) {
52+
53+
Bootstrap bootstrap = new Bootstrap()
54+
.group(myEventLoopGroup)
55+
.channel(NioSocketChannel.class)
56+
.option(ChannelOption.TCP_NODELAY, true)
57+
.option(ChannelOption.SO_KEEPALIVE, false)
58+
.option(ChannelOption.SO_SNDBUF, 65535)
59+
.option(ChannelOption.SO_RCVBUF, 65535)
60+
.handler(new ChannelInitializer<SocketChannel>() {
61+
@Override
62+
protected void initChannel(SocketChannel ch) throws Exception {
63+
ChannelPipeline pipeline = ch.pipeline();
64+
pipeline.addLast(nettyHandlerExecutorGroup,
65+
new NettyEncoder(),
66+
new NettyDecoder(),
67+
new ConnectResponseHandler());
68+
}
69+
});
70+
71+
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 9090);
72+
73+
final ChannelFuture channelFuture = bootstrap.connect(inetSocketAddress);
74+
75+
if (channelFuture.awaitUninterruptibly(2, TimeUnit.MINUTES)) {
76+
// heartBeat(channelFuture.channel());
77+
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
78+
@Override
79+
public void run() {
80+
heartBeat(channelFuture.channel());
81+
}
82+
}, 1000, 30 * 1000, TimeUnit.MILLISECONDS);
83+
}
84+
}
85+
86+
public static void heartBeat(Channel channel) {
87+
String request = "客户端发起了心跳请求";
88+
RemotingCommand command= new RemotingCommand();
89+
command.setBody(request.getBytes());
90+
command.setCode(1);
91+
channel.writeAndFlush(command);
92+
}
93+
94+
public static class ConnectResponseHandler extends SimpleChannelInboundHandler<RemotingCommand> {
95+
@Override
96+
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
97+
System.out.println("服务端返回消息了:" + new String(msg.getBody()));
98+
}
99+
}
100+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
6+
7+
import java.nio.ByteBuffer;
8+
9+
/**
10+
* @author lhy
11+
* @date 2021/8/19
12+
*/
13+
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
14+
15+
public NettyDecoder() {
16+
super(16777216, 0, 4, 0, 4);
17+
}
18+
19+
@Override
20+
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
21+
ByteBuf frame = null;
22+
23+
try {
24+
frame = (ByteBuf) super.decode(ctx, in);
25+
if (null == frame) {
26+
return null;
27+
}
28+
ByteBuffer byteBuffer = frame.nioBuffer();
29+
return RemotingCommand.decode(byteBuffer);
30+
} catch (Exception e) {
31+
e.printStackTrace();
32+
} finally {
33+
if (null != frame) {
34+
frame.release();
35+
}
36+
}
37+
38+
return null;
39+
}
40+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandler;
7+
import io.netty.channel.ChannelHandlerContext;
8+
import io.netty.handler.codec.MessageToByteEncoder;
9+
10+
import java.nio.ByteBuffer;
11+
12+
/**
13+
* @author lhy
14+
* @date 2021/8/19
15+
*/
16+
@ChannelHandler.Sharable
17+
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
18+
19+
@Override
20+
protected void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception {
21+
try {
22+
ByteBuffer header = remotingCommand.encodeHeader();
23+
out.writeBytes(header);
24+
byte[] body = remotingCommand.getBody();
25+
if (null != body) {
26+
out.writeBytes(body);
27+
}
28+
// out.writeBytes(remotingCommand.getBody());
29+
} catch (Exception e) {
30+
e.printStackTrace();
31+
ctx.channel().close().addListener(new ChannelFutureListener() {
32+
@Override
33+
public void operationComplete(ChannelFuture future) throws Exception {
34+
// 关闭channel成功
35+
}
36+
});
37+
}
38+
}
39+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import java.nio.ByteBuffer;
4+
5+
/**
6+
* @author lhy
7+
* @date 2021/8/19
8+
*/
9+
public class RemotingCommand {
10+
11+
private Integer code; // 请求码
12+
13+
private byte[] body; // 请求内容
14+
15+
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
16+
int limit = byteBuffer.limit();
17+
18+
int oriHeaderLen = byteBuffer.getInt();
19+
int headerLength = getHeaderLength(oriHeaderLen);
20+
21+
byte[] headerData = new byte[headerLength];
22+
byteBuffer.get(headerData);
23+
24+
int bodyLength = limit - 4 - headerLength;
25+
26+
byte[] body = new byte[bodyLength];
27+
byteBuffer.get(body);
28+
RemotingCommand remotingCommand = new RemotingCommand();
29+
remotingCommand.setBody(body);
30+
return remotingCommand;
31+
}
32+
33+
public ByteBuffer encodeHeader() {
34+
return encodeHeader(this.body.length);
35+
}
36+
37+
public ByteBuffer encodeHeader(final int bodyLength) {
38+
int length = 4;
39+
40+
byte[] headerData;
41+
headerData = this.headerEncode();
42+
length += headerData.length; // 头
43+
length += bodyLength; // 请求/响应体
44+
45+
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 分配header
46+
result.putInt(length);
47+
result.put(markProtocolType(headerData.length, SerializeType.JSON));
48+
result.put(headerData); // 添加头
49+
result.flip();
50+
51+
return result;
52+
}
53+
54+
public static byte[] markProtocolType(int source, SerializeType type) {
55+
byte[] result = new byte[4];
56+
57+
result[0] = type.getCode();
58+
result[1] = (byte) ((source >> 16) & 0xFF);
59+
result[2] = (byte) ((source >> 8) & 0xFF);
60+
result[3] = (byte) (source & 0xFF);
61+
return result;
62+
}
63+
64+
private byte[] headerEncode() {
65+
return RemotingSerializable.encode(this);
66+
}
67+
68+
public static int getHeaderLength(int length) {
69+
return length & 0xFFFFFF;
70+
}
71+
72+
public Integer getCode() {
73+
return code;
74+
}
75+
76+
public void setCode(Integer code) {
77+
this.code = code;
78+
}
79+
80+
public byte[] getBody() {
81+
return body;
82+
}
83+
84+
public void setBody(byte[] body) {
85+
this.body = body;
86+
}
87+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import com.alibaba.fastjson.JSON;
4+
5+
import java.nio.charset.Charset;
6+
7+
/**
8+
* @author lhy
9+
* @date 2021/8/19
10+
*/
11+
public abstract class RemotingSerializable {
12+
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
13+
14+
public static byte[] encode(final Object obj) {
15+
final String json = toJson(obj, false);
16+
if (json != null) {
17+
return json.getBytes(CHARSET_UTF8);
18+
}
19+
return null;
20+
}
21+
22+
public static String toJson(final Object obj, boolean prettyFormat) {
23+
return JSON.toJSONString(obj, prettyFormat);
24+
}
25+
26+
public static <T> T decode(final byte[] data, Class<T> classOfT) {
27+
final String json = new String(data, CHARSET_UTF8);
28+
return fromJson(json, classOfT);
29+
}
30+
31+
public static <T> T fromJson(String json, Class<T> classOfT) {
32+
return JSON.parseObject(json, classOfT);
33+
}
34+
35+
public byte[] encode() {
36+
final String json = this.toJson();
37+
if (json != null) {
38+
return json.getBytes(CHARSET_UTF8);
39+
}
40+
return null;
41+
}
42+
43+
public String toJson() {
44+
return toJson(false);
45+
}
46+
47+
public String toJson(final boolean prettyFormat) {
48+
return toJson(this, prettyFormat);
49+
}
50+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
/**
4+
* @author lhy
5+
* @date 2021/8/20
6+
*/
7+
public enum SerializeType {
8+
JSON((byte) 0);
9+
10+
private byte code;
11+
12+
SerializeType(byte code) {
13+
this.code = code;
14+
}
15+
16+
public static SerializeType valueOf(byte code) {
17+
for (SerializeType serializeType : SerializeType.values()) {
18+
if (serializeType.getCode() == code) {
19+
return serializeType;
20+
}
21+
}
22+
return null;
23+
}
24+
25+
public byte getCode() {
26+
return code;
27+
}
28+
}

0 commit comments

Comments
 (0)