Skip to content

Commit 0d0bc6e

Browse files
author
quding
committed
增加线程池应用,对于客户端最好用上线程池
1 parent 8012c8c commit 0d0bc6e

File tree

6 files changed

+190
-0
lines changed

6 files changed

+190
-0
lines changed

gRPC-Demo/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
<artifactId>grpc-stub</artifactId>
3030
<version>${grpc.version}</version>
3131
</dependency>
32+
33+
<!--客户端连接池-->
34+
<dependency>
35+
<groupId>org.apache.commons</groupId>
36+
<artifactId>commons-pool2</artifactId>
37+
<version>2.4.2</version>
38+
</dependency>
3239
</dependencies>
3340

3441
<build>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package cn.mrdear.grpc.common;
2+
3+
/**
4+
* 给模板类使用的接口
5+
* @author Niu Li
6+
* @since 17-2-8
7+
*/
8+
public interface WorkCallBack<S> {
9+
10+
void callback(S s);
11+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package cn.mrdear.grpc.hellopool;
2+
3+
import org.apache.commons.pool2.impl.GenericObjectPool;
4+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
5+
6+
import cn.mrdear.grpc.common.WorkCallBack;
7+
8+
/**
9+
* @author Niu Li
10+
* @since 17-2-8
11+
*/
12+
public class HelloWorldClientPool {
13+
14+
private static GenericObjectPool<HelloWorldClientSingle> objectPool = null;
15+
16+
static {
17+
// 连接池的配置
18+
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
19+
// 池中的最大连接数
20+
poolConfig.setMaxTotal(8);
21+
// 最少的空闲连接数
22+
poolConfig.setMinIdle(0);
23+
// 最多的空闲连接数
24+
poolConfig.setMaxIdle(8);
25+
// 当连接池资源耗尽时,调用者最大阻塞的时间,超时时抛出异常 单位:毫秒数
26+
poolConfig.setMaxWaitMillis(-1);
27+
// 连接池存放池化对象方式,true放在空闲队列最前面,false放在空闲队列最后
28+
poolConfig.setLifo(true);
29+
// 连接空闲的最小时间,达到此值后空闲连接可能会被移除,默认即为30分钟
30+
poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);
31+
// 连接耗尽时是否阻塞,默认为true
32+
poolConfig.setBlockWhenExhausted(true);
33+
// 连接池创建
34+
objectPool = new GenericObjectPool<>(new HelloWorldFactory(), poolConfig);
35+
}
36+
37+
/**
38+
* 从连接池获取对象
39+
*/
40+
public static HelloWorldClientSingle borrowObject(){
41+
try {
42+
HelloWorldClientSingle clientSingle = objectPool.borrowObject();
43+
System.out.println("总创建线程数"+objectPool.getCreatedCount());
44+
return clientSingle;
45+
} catch (Exception e) {
46+
e.printStackTrace();
47+
}
48+
//连接池失败则主动创建
49+
return createClient();
50+
}
51+
52+
/**
53+
* 还送对象
54+
*/
55+
public static void returnObject(HelloWorldClientSingle clientSingle){
56+
objectPool.returnObject(clientSingle);
57+
}
58+
59+
/**
60+
* 当连接池异常,则主动创建对象
61+
*/
62+
private static HelloWorldClientSingle createClient(){
63+
return new HelloWorldClientSingle("127.0.0.1", 55001);
64+
}
65+
66+
/**
67+
* 执行器
68+
* @param workCallBack 主要服务内容
69+
*/
70+
public static Runnable execute(WorkCallBack<HelloWorldClientSingle> workCallBack){
71+
return () -> {
72+
HelloWorldClientSingle client = borrowObject();
73+
try {
74+
workCallBack.callback(client);
75+
} finally {
76+
/** 将连接对象返回给连接池 */
77+
returnObject(client);
78+
}
79+
};
80+
}
81+
82+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package cn.mrdear.grpc.hellopool;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import io.grpc.ManagedChannel;
6+
import io.grpc.ManagedChannelBuilder;
7+
import io.grpc.StatusRuntimeException;
8+
import io.grpc.examples.helloworld.GreeterGrpc;
9+
import io.grpc.examples.helloworld.HelloReply;
10+
import io.grpc.examples.helloworld.HelloRequest;
11+
12+
/**
13+
* @author Niu Li
14+
* @date 2017/1/28
15+
*/
16+
public class HelloWorldClientSingle {
17+
18+
private final ManagedChannel channel; //一个gRPC信道
19+
private GreeterGrpc.GreeterBlockingStub greeterBlockingStub;//阻塞/同步 存根
20+
21+
//初始化信道和存根
22+
public HelloWorldClientSingle(String host,int port){
23+
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
24+
}
25+
26+
public void shutdown() throws InterruptedException {
27+
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
28+
}
29+
30+
//客户端方法
31+
public void greet(String name){
32+
//需要用到存根时创建,不可复用
33+
greeterBlockingStub = GreeterGrpc.newBlockingStub(channel).withCompression("gzip");
34+
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
35+
HelloReply response;
36+
try {
37+
response = greeterBlockingStub.sayHello(request);
38+
} catch (StatusRuntimeException e) {
39+
System.out.println("RPC调用失败:"+e.getMessage());
40+
return;
41+
}
42+
System.out.println("服务器返回信息:"+response.getMessage());
43+
}
44+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package cn.mrdear.grpc.hellopool;
2+
3+
/**
4+
* @author Niu Li
5+
* @since 17-2-8
6+
*/
7+
public class HelloWorldClientTest {
8+
9+
public static void main(String[] args) {
10+
for (int i = 0; i < 100; i++) {
11+
new Thread( HelloWorldClientPool.execute(clientSingle -> {
12+
clientSingle.greet("world");
13+
})).start();
14+
}
15+
}
16+
17+
18+
19+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package cn.mrdear.grpc.hellopool;
2+
3+
import org.apache.commons.pool2.BasePooledObjectFactory;
4+
import org.apache.commons.pool2.PooledObject;
5+
import org.apache.commons.pool2.impl.DefaultPooledObject;
6+
7+
/**
8+
* 连接池所需工厂
9+
* @author Niu Li
10+
* @since 17-2-8
11+
*/
12+
public class HelloWorldFactory extends BasePooledObjectFactory<HelloWorldClientSingle> {
13+
14+
private String host = "127.0.0.1";
15+
16+
private int port = 50051;
17+
18+
@Override
19+
public HelloWorldClientSingle create() throws Exception {
20+
return new HelloWorldClientSingle(this.host,this.port);
21+
}
22+
23+
@Override
24+
public PooledObject<HelloWorldClientSingle> wrap(HelloWorldClientSingle helloWorldClientSingle) {
25+
return new DefaultPooledObject<>(helloWorldClientSingle);
26+
}
27+
}

0 commit comments

Comments
 (0)