@@ -332,96 +332,196 @@ struct A {
332
332
333
333
### 一致性模型
334
334
335
- TODO: 加强这部分叙述的前后逻辑
336
-
337
335
并行执行的多个线程,从某种宏观层面上讨论,可以粗略的视为一种分布式系统。
338
- 其中每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。
339
- 理论上来说,一致性包含四种不同的类型:
336
+ 在分布式系统中,任何通信乃至本地操作都需要消耗一定时间,甚至出现不可靠的通信。
337
+
338
+ 如果我们强行将一个变量 `v` 在多个线程之间的操作设为原子操作,即任何一个线程在操作完 `v` 后,
339
+ 其他线程均能**同步**感知到 `v` 的变化,则对于变量 `v` 而言,表现为顺序执行的程序,它并没有由于引入多线程
340
+ 而得到任何效率上的收益。对此有什么办法能够适当的加速呢?答案便是削弱原子操作的在进程间的同步条件。
341
+
342
+ 从原理上看,每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。
343
+ 削弱进程间的同步条件,通常我们会考虑四种不同的一致性模型:
340
344
341
345
1. 线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。
342
346
343
347
```
344
- x.write (1) x.read ()
348
+ x.store (1) x.load ()
345
349
T1 ---------+----------------+------>
346
350
347
351
348
352
T2 -------------------+------------->
349
- x.write (2)
353
+ x.store (2)
350
354
```
351
355
352
- 在这种情况下线程 `T1`, `T2` 对 `x` 的两次写操作是原子的,且 `x.write (1)` 是严格的发生在 `x.write (2)` 之前,`x.write (2)` 严格的发生在 `x.read ()` 之前。
356
+ 在这种情况下线程 `T1`, `T2` 对 `x` 的两次写操作是原子的,且 `x.store (1)` 是严格的发生在 `x.store (2)` 之前,`x.store (2)` 严格的发生在 `x.load ()` 之前。
353
357
值得一提的是,线性一致性对全局时钟的要求是难以实现的,这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。
354
358
355
359
2. 顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的顺序一致。
356
360
357
361
```
358
- x.write (1) x.write (3) x.read ()
362
+ x.store (1) x.store (3) x.load ()
359
363
T1 ---------+-----------+----------+----->
360
364
361
365
362
366
T2 ---------------+---------------------->
363
- x.write (2)
367
+ x.store (2)
364
368
365
369
或者
366
370
367
- x.write (1) x.write (3) x.read ()
371
+ x.store (1) x.store (3) x.load ()
368
372
T1 ---------+-----------+----------+----->
369
373
370
374
371
375
T2 ------+------------------------------->
372
- x.write (2)
376
+ x.store (2)
373
377
```
374
378
375
- 在顺序一致性的要求下,`x.read ()` 必须读到最近一次写入的数据,因此 `x.write (2)` 与 `x.write (1)` 并无任何先后保障,即 只要 `T2` 的 `x.write (2)` 发生在 `x.write (3)` 之前即可。
379
+ 在顺序一致性的要求下,`x.load ()` 必须读到最近一次写入的数据,因此 `x.store (2)` 与 `x.store (1)` 并无任何先后保障,即 只要 `T2` 的 `x.store (2)` 发生在 `x.store (3)` 之前即可。
376
380
377
381
3. 因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作顺序则不做要求。
378
382
379
- 4. 最终一致性:TODO:
383
+ ```
384
+ a = 1 b = 2
385
+ T1 ----+-----------+---------------------------->
386
+
387
+
388
+ T2 ------+--------------------+--------+-------->
389
+ x.store(3) c = a + b y.load()
390
+
391
+ 或者
392
+
393
+ a = 1 b = 2
394
+ T1 ----+-----------+---------------------------->
395
+
396
+
397
+ T2 ------+--------------------+--------+-------->
398
+ x.store(3) y.load() c = a + b
399
+
400
+ 亦或者
401
+
402
+ b = 2 a = 1
403
+ T1 ----+-----------+---------------------------->
404
+
405
+
406
+ T2 ------+--------------------+--------+-------->
407
+ y.load() c = a + b x.store(3)
408
+ ```
409
+
410
+ 上面给出的三种例子都是属于因果一致的,因为整个过程中,只有 `c` 对 `a` 和 `b` 产生依赖,而 `x` 和 `y`
411
+ 在此例子中表现为没有关系(但实际情况中我们需要更详细的信息才能确定 `x` 与 `y` 确实无关)
412
+
413
+ 4. 最终一致性:是最弱的一致性要求,它只保障某个操作在未来的某个时间节点上会被观察到,但并未要求被观察到的时间。因此我们甚至可以对此条件稍作加强,例如规定某个操作被观察到的时间总是有界的。当然这已经不在我们的讨论范围之内了。
414
+
415
+ ```
416
+ x.store(3) x.store(4)
417
+ T1 ----+-----------+-------------------------------------------->
418
+
419
+
420
+ T2 ---------+------------+--------------------+--------+-------->
421
+ x.read() x.read() x.read() x.read()
422
+ ```
423
+
424
+ 在上面的情况中,如果我们假设 x 的初始值为 0,则 `T2` 中四次 `x.read()` 结果可能但不限于以下情况:
425
+
426
+ ```
427
+ 3 4 4 4 // x 的写操作被很快观察到
428
+ 0 3 3 4 // x 的写操作被观察到的时间存在一定延迟
429
+ 0 0 0 4 // 最后一次读操作读到了 x 的最终值,但此前的变化并未观察到
430
+ 0 0 0 0 // 在当前时间段内 x 的写操作均未被观察到,但未来某个时间点上一定能观察到 x 为 4 的情况
431
+ ```
380
432
381
433
### 内存顺序
382
434
383
- TODO:
435
+ 为了追求极致的性能,实现各种强度要求的一致性,C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order` 的选项,表达了四种多线程间的同步模型:
384
436
385
- C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order` 的选项,
386
- 表达了四种多线程间的同步模型:
437
+ 1. 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 `std::memory_order_relaxed` 指定。我们来看一个例子:
387
438
388
- - 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 `std::memory_order_relaxed` 指定。我们来看一个例子:
439
+ ```cpp
440
+ std::atomic<int> counter = {0};
441
+ std::vector<std::thread> vt;
442
+ for (int i = 0; i < 100; ++i) {
443
+ vt.emplace_back([](){
444
+ counter.fetch_add(1, std::memory_order_relaxed);
445
+ });
446
+ }
447
+
448
+ for (auto& t : vt) {
449
+ t.join();
450
+ }
451
+ std::cout << "current counter:" << counter << std::endl;
452
+ ```
453
+
454
+ 2. 释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程需要修改某个值,但另一个线程会对该值的某次操作产生依赖,即后者依赖前者。具体而言,线程 A 完成了三次对 `x` 的写操作,线程 `B` 仅依赖其中第三次 `x` 的写操作,与 `x` 的前两次写行为无关,则当 `A` 主动 `x.release()` 时候(即使用 `std::memory_order_release`),选项 `std::memory_order_consume` 能够确保 `B` 在调用 `x.load()` 时候观察到 `A` 中第三次对 `x` 的写操作。我们来看一个例子:
389
455
390
456
```cpp
391
- #include <atomic>
392
- #include <thread>
393
- #include <vector>
394
- #include <iostream>
457
+ std::atomic<int*> ptr;
458
+ int v;
459
+ std::thread producer([&]() {
460
+ int* p = new int(42);
461
+ v = 1024;
462
+ ptr.store(p, std::memory_order_release);
463
+ });
464
+ std::thread consumer([&]() {
465
+ int* p;
466
+ while(!(p = ptr.load(std::memory_order_consume)));
395
467
396
- std::atomic<int> counter = {0};
468
+ std::cout << "p: " << *p << std::endl;
469
+ std::cout << "v: " << v << std::endl;
470
+ });
471
+ producer.join();
472
+ consumer.join();
473
+ ```
397
474
398
- int main() {
399
- std::vector<std::thread> vt;
400
- for (int i = 0; i < 100; ++i) {
401
- vt.emplace_back([](){
402
- counter.fetch_add(1, std::memory_order_relaxed);
403
- });
404
- }
475
+ 3. 释放/获取模型:在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放 `std::memory_order_release` 和获取 `std::memory_order_acquire` 之间规定时序,即发生在释放操作之前的**所有**写操作,对其他线程的任何获取操作都是可见的,亦即发生顺序(happens-before)。
476
+
477
+ 可以看到,`std::memory_order_release` 确保了它之后的写行为不会发生在释放操作之前,是一个向后的屏障,而 `std::memory_order_acquire` 确保了它之后的前的写行为,不会发生在该获取操作之后,是一个向前的屏障,对于选项 `std::memory_order_acq_rel` 而言,则结合了这两者的特点,唯一确定了一个内存屏障,使得当前线程对内存的读写不会被重排到此操作的前后。
405
478
406
- for (auto& t : vt) {
407
- t.join();
479
+ 我们来看一个例子:
480
+
481
+ ```cpp
482
+ std::vector<int> v;
483
+ std::atomic<int> flag = {0};
484
+ std::thread release([&]() {
485
+ v.push_back(42);
486
+ flag.store(1, std::memory_order_release);
487
+ });
488
+ std::thread acqrel([&]() {
489
+ int expected = 1; // must before compare_exchange_strong
490
+ while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {
491
+ expected = 1; // must after compare_exchange_strong
408
492
}
409
- std::cout << "current counter:" << counter << std::endl;
410
- return 0 ;
411
- }
412
- ```
493
+ // flag has changed to 2
494
+ }) ;
495
+ std::thread acquire([&]() {
496
+ while(flag.load(std::memory_order_acquire) < 2);
413
497
414
- - 释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程依赖某个值,但另一个线程同时会对该值进行修改,即前者依赖后者。
498
+ std::cout << v.at(0) << std::endl; // must be 42
499
+ });
500
+ release.join();
501
+ acqrel.join();
502
+ acquire.join();
503
+ ```
415
504
416
- - 释放/获取模型:在此模型下,我们可以进一步限制不同线程间原子操作的顺序,在释放(release)和获取(acquire)之间规定时序,即发生在释放操作之前的写操作,对其他线程的任何获取操作都是可见的,即发生顺序(happens-before) 。
505
+ 在此例中我们使用了 `compare_exchange_strong`,它便是比较交换原语(Compare-and-swap primitive),它有一个更弱的版本,即 `compare_exchange_weak`,它允许即便交换成功,也仍然返回 `false` 失败。其原因是因为在某些平台上虚假故障导致的,具体而言,当 CPU 进行上下文切换时,另一线程加载同一地址产生的不一致。除此之外,`compare_exchange_strong` 的性能可能稍差于 `compare_exchange_weak`,但大部分情况下,`compare_exchange_strong` 应该被有限考虑 。
417
506
418
- `std::memory_order_consume`、`std::memory_order_acquire`、`std::memory_order_release`、`std::memory_order_acq_rel` 这四种选项均为这两个模型服务的 TODO: 未写完
507
+ 4. 顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 `std::memory_order_seq_cst` 进行指定。最后来看一个例子:
419
508
420
- - 顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 `std::memory_order_seq_cst` 进行指定。
509
+ ```cpp
510
+ std::atomic<int> counter = {0};
511
+ std::vector<std::thread> vt;
512
+ for (int i = 0; i < 100; ++i) {
513
+ vt.emplace_back([](){
514
+ counter.fetch_add(1, std::memory_order_seq_cst);
515
+ });
516
+ }
421
517
422
- ## 7.6 事务内存
518
+ for (auto& t : vt) {
519
+ t.join();
520
+ }
521
+ std::cout << "current counter:" << counter << std::endl;
522
+ ```
423
523
424
- TODO: C++20 放到第十章?
524
+ 这个例子与第一个宽松模型的例子本质上没有区别,仅仅只是将原子操作的内存顺序修改为了 `memory_order_seq_cst`,有兴趣的读者可以自行编写程序测量这两种不同内存顺序导致的性能差异。
425
525
426
526
## 总结
427
527
@@ -433,19 +533,19 @@ C++11 语言层提供了并发编程的相关支持,本节简单的介绍了 `
433
533
434
534
1. 请编写一个简单的线程池,提供如下功能:
435
535
436
- ```cpp
437
- ThreadPool p(4); // 指定四个工作线程
536
+ ```cpp
537
+ ThreadPool p(4); // 指定四个工作线程
438
538
439
- // 将任务在池中入队,并返回一个 std::future
440
- auto f = pool.enqueue([](int life) {
441
- return meaning;
442
- }, 42);
539
+ // 将任务在池中入队,并返回一个 std::future
540
+ auto f = pool.enqueue([](int life) {
541
+ return meaning;
542
+ }, 42);
443
543
444
- // 从 future 中获得执行结果
445
- std::cout << f.get() << std::endl;
446
- ```
544
+ // 从 future 中获得执行结果
545
+ std::cout << f.get() << std::endl;
546
+ ```
447
547
448
- 2 . 请实现一个无锁版本的 FIFO 队列,提供 ` enqueue ` 和 ` dequeue ` 两个方法 。
548
+ 2. 请使用 `std::atomic<book>` 实现一个互斥锁 。
449
549
450
550
[返回目录](./toc.md) | [上一章](./06-regex.md) | [下一章 文件系统](./08-filesystem.md)
451
551
0 commit comments