Skip to content

Commit 42e819e

Browse files
committed
Add lesson 9
1 parent b9f1e37 commit 42e819e

File tree

11 files changed

+683
-1
lines changed

11 files changed

+683
-1
lines changed

spring-cloud/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ Spring Cloud 系列课程致力于以实战的方式覆盖 Spring Cloud 的功
128128

129129

130130

131-
### 第九节 Spring Cloud Hystrix 源码解读
131+
### [第九节 Spring Cloud Hystrix 源码解读](https://segmentfault.com/l/1500000011386273/play)[课件](https://github.com/mercyblitz/segmentfault-lessons/tree/master/spring-cloud/lesson-9)[[问答](https://segmentfault.com/l/1500000011386273/d/1560000012218757)]
132132

133133
* 受众群体:具备一定的`Java`服务端编程经验更佳
134134

spring-cloud/lesson-9/README.md

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
# Spring Cloud Hystrix 源码分析
2+
3+
4+
5+
6+
7+
## Spring Cloud Hystrix 源码解读
8+
9+
10+
11+
### `@EnableCircuitBreaker`
12+
13+
职责:
14+
15+
* 激活 Circuit Breaker
16+
17+
18+
19+
#### 初始化顺序
20+
21+
* `@EnableCircuitBreaker `
22+
* `EnableCircuitBreakerImportSelector`
23+
* `HystrixCircuitBreakerConfiguration`
24+
25+
26+
27+
### HystrixCircuitBreakerConfiguration
28+
29+
30+
31+
#### 初始化组件
32+
33+
* `HystrixCommandAspect`
34+
* `HystrixShutdownHook`
35+
* `HystrixStreamEndpoint` : Servlet
36+
* `HystrixMetricsPollerConfiguration`
37+
38+
39+
40+
## Netflix Hystrix 源码解读
41+
42+
### `HystrixCommandAspect`
43+
44+
#### 依赖组件
45+
46+
* `MetaHolderFactory`
47+
* `HystrixCommandFactory`: 生成`HystrixInvokable`
48+
* `HystrixInvokable`
49+
* `CommandCollapser`
50+
* `GenericObservableCommand`
51+
* `GenericCommand`
52+
53+
54+
55+
### Future 实现 服务熔断
56+
57+
```java
58+
package com.segumentfault.springcloudlesson9.future;
59+
60+
import java.util.Random;
61+
import java.util.concurrent.*;
62+
63+
/**
64+
* 通过 {@link Future} 实现 服务熔断
65+
*
66+
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
67+
*/
68+
public class FutureCircuitBreakerDemo {
69+
70+
public static void main(String[] args) throws InterruptedException, ExecutionException {
71+
72+
// 初始化线程池
73+
ExecutorService executorService = Executors.newSingleThreadExecutor();
74+
75+
RandomCommand command = new RandomCommand();
76+
77+
Future<String> future = executorService.submit(command::run);
78+
79+
String result = null;
80+
// 100 毫秒超时时间
81+
try {
82+
result = future.get(100, TimeUnit.MILLISECONDS);
83+
} catch (TimeoutException e) {
84+
// fallback 方法调用
85+
result = command.fallback();
86+
}
87+
88+
System.out.println(result);
89+
90+
executorService.shutdown();
91+
92+
}
93+
94+
/**
95+
* 随机对象
96+
*/
97+
private static final Random random = new Random();
98+
99+
/**
100+
* 随机事件执行命令
101+
*/
102+
public static class RandomCommand implements Command<String> {
103+
104+
105+
@Override
106+
public String run() throws InterruptedException {
107+
108+
long executeTime = random.nextInt(200);
109+
110+
// 通过休眠来模拟执行时间
111+
System.out.println("Execute Time : " + executeTime + " ms");
112+
113+
Thread.sleep(executeTime);
114+
115+
return "Hello,World";
116+
}
117+
118+
@Override
119+
public String fallback() {
120+
return "Fallback";
121+
}
122+
}
123+
124+
125+
public interface Command<T> {
126+
127+
/**
128+
* 正常执行,并且返回结果
129+
*
130+
* @return
131+
*/
132+
T run() throws Exception;
133+
134+
/**
135+
* 错误时,返回容错结果
136+
*
137+
* @return
138+
*/
139+
T fallback();
140+
141+
}
142+
}
143+
```
144+
145+
146+
147+
148+
149+
## RxJava 基础
150+
151+
152+
153+
### 单数据:Single
154+
155+
156+
157+
```java
158+
Single.just("Hello,World") // 仅能发布单个数据
159+
.subscribeOn(Schedulers.io()) // 在 I/O 线程执行
160+
.subscribe(RxJavaDemo::println) // 订阅并且消费数据
161+
;
162+
```
163+
164+
165+
166+
### 多数据:Observable
167+
168+
169+
170+
```java
171+
List<Integer> values = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
172+
173+
Observable.from(values) //发布多个数据
174+
.subscribeOn(Schedulers.computation()) // 在 I/O 线程执行
175+
.subscribe(RxJavaDemo::println) // 订阅并且消费数据
176+
;
177+
178+
// 等待线程执行完毕
179+
Thread.sleep(100);
180+
```
181+
182+
183+
184+
### 使用标准 Reactive 模式
185+
186+
187+
188+
```java
189+
List<Integer> values = Arrays.asList(1, 2, 3);
190+
191+
Observable.from(values) //发布多个数据
192+
.subscribeOn(Schedulers.newThread()) // 在 newThread 线程执行
193+
.subscribe(value -> {
194+
195+
if (value > 2)
196+
throw new IllegalStateException("数据不应许大于 2");
197+
198+
//消费数据
199+
println("消费数据:" + value);
200+
201+
}, e -> {
202+
// 当异常情况,中断执行
203+
println("发生异常 , " + e.getMessage());
204+
}, () -> {
205+
// 当整体流程完成时
206+
println("流程执行完成");
207+
})
208+
209+
;
210+
211+
// 等待线程执行完毕
212+
Thread.sleep(100);
213+
```
214+
215+
216+
217+
218+
219+
## Java 9 Flow API
220+
221+
```java
222+
package concurrent.java9;
223+
224+
import java.util.concurrent.Flow;
225+
import java.util.concurrent.SubmissionPublisher;
226+
227+
/**
228+
* {@link SubmissionPublisher}
229+
*
230+
* @author mercyblitz
231+
**/
232+
public class SubmissionPublisherDemo {
233+
234+
public static void main(String[] args) throws InterruptedException {
235+
236+
try (SubmissionPublisher<Integer> publisher =
237+
new SubmissionPublisher<>()) {
238+
239+
//Publisher(100) => A -> B -> C => Done
240+
publisher.subscribe(new IntegerSubscriber("A"));
241+
publisher.subscribe(new IntegerSubscriber("B"));
242+
publisher.subscribe(new IntegerSubscriber("C"));
243+
244+
// 提交数据到各个订阅器
245+
publisher.submit(100);
246+
247+
}
248+
249+
250+
Thread.currentThread().join(1000L);
251+
252+
}
253+
254+
private static class IntegerSubscriber implements
255+
Flow.Subscriber<Integer> {
256+
257+
private final String name;
258+
259+
private Flow.Subscription subscription;
260+
261+
private IntegerSubscriber(String name) {
262+
this.name = name;
263+
}
264+
265+
@Override
266+
public void onSubscribe(Flow.Subscription subscription) {
267+
System.out.printf(
268+
"Thread[%s] Current Subscriber[%s] " +
269+
"subscribes subscription[%s]\n",
270+
Thread.currentThread().getName(),
271+
name,
272+
subscription);
273+
this.subscription = subscription;
274+
subscription.request(1);
275+
}
276+
277+
@Override
278+
public void onNext(Integer item) {
279+
System.out.printf(
280+
"Thread[%s] Current Subscriber[%s] " +
281+
"receives item[%d]\n",
282+
Thread.currentThread().getName(),
283+
name,
284+
item);
285+
subscription.request(1);
286+
}
287+
288+
@Override
289+
public void onError(Throwable throwable) {
290+
throwable.printStackTrace();
291+
}
292+
293+
@Override
294+
public void onComplete() {
295+
System.out.printf(
296+
"Thread[%s] Current Subscriber[%s] " +
297+
"is completed!\n",
298+
Thread.currentThread().getName(),
299+
name);
300+
}
301+
302+
}
303+
304+
}
305+
```
306+
Binary file not shown.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
target/
2+
!.mvn/wrapper/maven-wrapper.jar
3+
4+
### STS ###
5+
.apt_generated
6+
.classpath
7+
.factorypath
8+
.project
9+
.settings
10+
.springBeans
11+
12+
### IntelliJ IDEA ###
13+
.idea
14+
*.iws
15+
*.iml
16+
*.ipr
17+
18+
### NetBeans ###
19+
nbproject/private/
20+
build/
21+
nbbuild/
22+
dist/
23+
nbdist/
24+
.nb-gradle/

0 commit comments

Comments
 (0)