Skip to content

Latest commit

 

History

History

Threads

Table of Contents generated with DocToc

并发支持库

  • 支持多线程的内存模型。
  • 三种粒度和层级的并发编程支持:
    • 无锁编程memory_order atomic atomic_thread_fence等。
    • 基于线程的并发支持:thread condition_variable mutex
    • 基于任务的并发支持:future promise packaged_task async

内存模型

C++引入了内存模型使我们不必考虑诸如下列问题:

  • 多线程中多个全局变量位于同一个内存位置(同一个最小CPU存取单元中,比如一个字)。
  • 多线程中多级缓存数据不一致的影响。
  • 现代CPU中的指令重排对多线程的影响。
  • 但多线程中的数据竞争仍需要显式的同步或锁处理。

内存顺序:

定义于<atomic>

// C++11
typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;
// C++20
enum class memory_order : /*unspecified*/ {
    relaxed, consume, acquire, release, acq_rel, seq_cst
};
inline constexpr memory_order memory_order_relaxed = memory_order::relaxed;
inline constexpr memory_order memory_order_consume = memory_order::consume;
inline constexpr memory_order memory_order_acquire = memory_order::acquire;
inline constexpr memory_order memory_order_release = memory_order::release;
inline constexpr memory_order memory_order_acq_rel = memory_order::acq_rel;
inline constexpr memory_order memory_order_seq_cst = memory_order::seq_cst;
  • 主要用于原子数据类型的操作和内存栅栏(内存屏障)等操作中。
  • 更多细节和含义我也还不懂。

无锁编程

首先:

  • 无锁编程是真正的专家领域,非专家不要轻易尝试使用原子类型作为单纯的跨线程计数器之外的无锁编程。
  • 无锁编程通常是和硬件高度相关、不可跨平台移植的,除非是深入理解硬件的专家,否则不要轻易尝试。
  • 就算是专家都需要大量的时间、大量测试才能够保证无锁程序的可用性,无锁程序通常都会是bug制造机、不可预测的噩梦源泉。
  • 无锁编程即是指多个线程之间共享数据,但是依赖数据的原子操作来保证并发正确性与线程安全,不使用任何形式的锁、信号量等阻塞性的同步工具。任何时刻都不会有任何程序因为同步问题处于阻塞状态。
  • 无锁编程通常用于极端性能敏感的情况,现实中这种情况少之又少,总之不要轻易尝试通用的无锁编程。
  • 但是在多个线程间用一个原子计数器之类比较简单的场景还是比较容易使用和预测的。

无锁编程相关设施基本都定义于<atomic>

// Defined in header <atomic>
template< class T >
struct atomic; // (1)	(since C++11)
template< class U >
struct atomic<U*> // (2)	(since C++11) // Defined in header <memory>
template< class U >
struct atomic<std::shared_ptr<U>> // (3)	(since C++20)
template< class U >
struct atomic<std::weak_ptr<U>>; // (4)	(since C++20 // Defined in header <stdatomic.h>
#define _Atomic(T) /* see below */ // (5)	(since C++23)
  • std::atomic对基本整数类型和指针做了特化,并且通过_Atomic宏对应于C标准的相应名称,每个特化都直接对应的C标准的一个类型,C标准的东西不赘述,这里只讨论C++的东西。

std::atomic操作:

  • 赋值。
  • 是否无锁:is_lock_free
  • 存取load store
  • 交换exchange
  • 比较并交换:compare_exchange_weak compare_exchange_strong
  • 对于整数:原子+ - & | ^ ++ -- += -= &= |= ^=
  • 对于指针:原子+ - ++ -- += -=
  • 每个操作都对应一个C标准中的一个函数。

原子标志std::atomic_flag

  • 原子布尔类型,不同于所有std::atomic的特化,保证无锁(std::atomic是否有锁由实现与平台决定,不过一般来说内置类型都是无锁的)。
  • 不同于std::atomic<bool>,不提供加载存储操作。
  • 可移植的原子默认初始化:std::atomic_flag fl {ATOMIC_FLAG_INIT};
  • 测试并设置(并且返回旧值):test_and_set
  • 清除:clear
  • C++20提供操作:
    • 测试(原子地返回当前值)test
    • 等待(阻塞线程直到值更改)wait
    • 通知其他线程值已更改:notify_one notify_all
  • 可在用户空间实现自选互斥。

内存栅栏/内存屏障:

  • std::atomic_thread_fence(order)
  • std::atomic_signal_handler(order)

volatile

  • volatile(易挥发性存储)在内存模型中没有任何含义,也不用于任何形式的线程同步机制。
  • volatile只是指出变量可能被线程之外的其他东西(基本上说的就是硬件)更改,必须执行每一次该变量的访存操作,而不能将其中的某些访存操作合并优化掉。
  • 除了直接操作硬件的程序(比如驱动程序、嵌入式程序)中,不应该在任何其他地方使用volatile
  • 某些语言中比如java用volatile这个关键字来表示某种线程同步机制,但C++中并非如此(而很多人却以为如此拿来这么用),这个关键字被广泛误解也是因为这个原因。
  • 线程间数据同步请使用atomic condition_variable mutex

线程

线程类,定义于<thread>

class thread;
  • 类型:
    • id:轻量可平凡赋值线程id类型,可用作关联容器关键字。
    • native_handle_type:实现定义的操作系统底层线程句柄类型。
  • 构造:
    • 默认构造一个没有任务的线程。
    • 不可拷贝,只能移动构造。
    • thread t(f, args...),创建可调用对象调用特定参数作为任务的线程。
  • 观察器:
    • jionable,检查对象是否标识一个活跃的执行线程,具体来说就是get_id() != std::thread::id()返回true。默认构造的线程不可结合。结束执行代码,但仍未结合的线程可被当做活跃的执行线程,从而可以结合。
    • get_id:获取id。
    • native_handle:返回实现定义的底层线程句柄。
    • hardware_concurrency:静态方法,返回实现支持的并发线程数,应该只当做提示。比如四核八线的intel x86处理器则返回8。
  • 操作:
    • join:阻塞当前线程直至*this标识线程执行结束,即等待某个线程运行到结束。运行结束后jionable()返回false。不可对当前线程、不可结合的线程调用。
    • detach:从thread对象分离执行线程,允许执行独立地持续,一旦该线程退出,则释放任何分配资源。调用后*this不再占有该线程。不可结合则会抛出错误,调用后即不可结合。
    • swap:交换两个线程的底层句柄。std::swap特化。

说明:

  • 身份id
    • 每个线程的身份标识是thread::id,每个有任务的线程唯一,默认没有任务(默认构造)的时候、运行结束(join)之后、被移动后、分离(detach)之后就是id{},通过get_id获取。也可以在线程内部通过std::this_thread::get_id()获取。
    • 每个thread都有一个id,但是系统线程可能没有id但是可以继续运行,比如在detach之后。
    • std::thread::id可以比较、输出、作为哈希表的键。
  • 构造:
    • 构造的时候线程就立即开始运行,没有一个显式的线程运行操作。
    • 传递给thread的第一个参数后的函数参数会通过一个类似于bind的机制绑定到函数对象上,也就是会拷贝过去(事实上如果将函数对象参数声明为引用会直接编译失败)。要传递引用需要使用lambda的引用捕获或者std::ref std::cref
    • 将一个线程移动到另一个上不会影响其执行。
  • 析构:
    • std::thread对象析构的时候,如果对象还是可结合的(也就是还没有执行完),那么会直接调用std::terminate
    • std::thread不会在析构的时候自动join
    • C++20引入了std::jthread则会在结束时自动结合,并且还能够取消/停止。
  • 结合join
    • std::thread::join会告诉当前线程等待特定线程直到其运行结束。
    • 如果析构时没有join则会调用std::terminate
    • 考虑使用RAII类来管理将std::thread作为资源管理,或者使用析构时会自动join的std::jthread
    • 可以这样定义这个RAII类:
    struct guarded_thread : std::thread
    {
        using std::thread::thread;
        ~guarded_thread()
        {
            if (this->joinable())
            {
                this->join();
            }
        }
    };
    • 当然如果C++20可用,那么最好使用std::jthread
    • 那为什么不让线程在析构时自动结合呢?因为长期存在让线程自己决定要长期运行还是什么时候结束的传统。比如定时器程序或者系统中的守护进程(daemons)。
  • 分离detach
    • 在不分离线程的情况下尝试使线程执行到超出其析构函数被视为严重的错误。
    • 如果真的需要一个底层系统线程比它对应的std::thread存活更长时间,那么需要使用detach,分离后get_id得到id{},不再可结合。
    • detach会让程序变得难以调试和容易出错,但某些时候确实是有用的。
    • std::thread可以移动,所以可以通过返回值传递出局部作用域,这在某种程度上可以作为detach的替代。
    • 如果要detach一个线程,确保这个线程不要引用局部作用域中的变量(典型例子是lambda不要引用捕获局部作用域的东西)。这种情况很可能会导致往栈上写无效数据,导致程序宕掉。这种数据共享必须非常谨慎小心地处理,稍有不慎就可能出问题。
  • 命名空间this_thread
    • get_id():获取当前运行的线程的id。
    • yield():通知/建议调度器让出当前线程的时间片,让其他线程能够得到运行。
    • sleep_until(tp):当前线程睡眠到time_point tp
    • sleeep_for(d):当前线程睡眠一段时间(duration d)。
    • yield在等到原子变量改变状态或者多线程之间合作非常有用。
    • 但通常来说使用sleep_for睡眠一段时间可能比仅仅使用yield更有用,yield可以看做是非常罕见和特定的场景下的优化措施。
    • 大多数实现中线程实现为抢占式的,由实现确保每个线程得到合理数量的时间片。
    • 通常来说,程序不应该依赖于系统时钟,如果系统时钟被修改,可能会影响timed_mutexwait_until接口,但是wait_for不会受到影响。在编写多线程程序时,通常来说需要考虑系统时钟被修改的影响。
  • 杀死线程:
    • 某些时候,杀死一个线程是很有用的,这表示对这个线程的任务已经不感兴趣,直接让该线程释放用到的资源并结束就行。
    • 在不同的系统中,这个操作也叫做杀死(kill)、取消(cancel)、中断(interrupt)一个线程。
    • 由于各种各样的历史原因和技术原因,std::thread没有提供这个操作。
    • 不过可以在用户层去自己实现这个东西,比如说许多线程实现时都涉及到一个请求循环,接受不同的请求,然后依次进行处理,那么可以将杀死线程作为一个请求发送给该线程,然后该线程释放资源,结束运行。
    • 如果线程中没有一个请求循环这种东西,那么可以通过定期检验一个是否需要杀死线程的原子变量来实现,由主线程或者其他线程进行设置。
    • 通用的跨平台的取消操作可能难以实现,但是在特定的场景下一个特定的取消操作还是很简单的。
  • thread_local数据:
    • thread_local用于静态生命周期对象时,就变成了线程存储期对象,每个线程将会有一份。更具体来说用于静态对象(局部、类)、全局对象、命名空间作用域对象。
    • thread_local在每个线程中存在一份,在每个线程中会在第一次使用前初始化,相当于每个线程中调用初始化语句一次。如果这个初始化语句有状态,那么不同线程中这个变量的初值可能不同,比如std::atomic<int> var = 10; thread_local val = var++;
    • 不要误解为线程创建时才从当前变量创建一个分支,使用当前进程的值初始化,这明显是不合理的。
    • 对于每个线程需要一份,但是比较大不适合存储在栈里面的数据,最好方式就是使用thread_local
    • 就像static变量一样,thread_local变量默认初始化为0。
    • 对于静态局部的thread_local变量,第一次运行时才会初始化。多个thread_local变量的构造顺序是不确定的,最好不要有初始化依赖关系,有依赖的话同样可以使用静态局部变量实现的单例模式。
    • thread_local变量不会在多个线程间共享,也就不会有数据竞争。但是thread_local不是解决数据竞争的灵丹妙药,如果一个静生命周期对象在语义上就是用与多线程间共享数据的,那么使用thread_local实际上改变了语义,大概率是一个错误选项。简单的将共享的对象改成thread_local可能是有问题的,需要仔细斟酌。
    • 最后注意静态成员变量、局部静态变量都属于静态生命周期,同全局变量一样都会在多个线程中共享。

避免数据竞争

当多个线程需要对同一份数据进行读写时,就不可避免地会出现数据竞争:

  • 首先,避免数据竞争最好的方法就是不要共享数据。具体方式是:
    • 只使用局部变量、thread_local数据以及仅在线程内部使用的动态数据。
    • 并且不要将这些数据的指针引用传递到其他线程。
    • 如果其他线程一定需要访问当前线程的数据,则将数据按指针传递到其他线程,并且在该线程处理完成前当前线程不要使用、不要释放这些数据。
  • 上面的做法都是防止同时访问数据的手段,所以不需要任何锁来同步可以获得最高的性能。
  • 但不是所有时候上面的做法都能够达成目的,那么在一定需要共享数据的时候就需要某种形式的锁用于数据同步:
    • 互斥锁(mutexes):互斥锁是用来表示对某种资源的排他性访问的对象。如果资源通过互斥锁来同步,那么访问前需要先获取锁,然后访问,访问弯成后释放锁。
    • 条件变量(condition variables):条件变量是线程用来等待其他线程的一个事件发生或者一个定时器到期的变量。严格来说,条件变量不能避免数据竞争,但是使用它某些时候可以让我们避免使用可能引起数据竞争的共享数据。

互斥锁

互斥锁(mutex):

  • 互斥锁用于避免数据竞争以及同步多个线程中对共享数据的访问。
  • std::mutex:基本互斥锁,如果尝试获取已经被获取的mutex会阻塞当前线程。
  • std::timed_mutex:相比普通互斥锁还提供尝试在某个一段时间内获取锁的操作,如果这段时间内没有获取到,则直接返回。
  • std::recursive_mutex:可以被一个线程重复获取(递归获取)的互斥锁。
  • std::recursive_timed_mutex:递归地、提供定时获取操作的互斥锁。
  • std::lock_guard<>:互斥锁的RAII包装类。
  • std::unique_lock<>:互斥锁的可移动RAII包装类。
  • std::scoped_lock<>:同时获取多个锁、避免死锁的RAII包装类,和std::lock做同样的事情。具体来说就是同时尝试获取多个锁,如果其中有一个未获取到就释放所有已经获取到的锁,以避免死锁。
  • 其中std::mutex是最简单、最小、最快的互斥锁,递归的和提供计时功能的互斥锁都会有一些额外开销。
  • 任何时刻只有一个线程可以拥有(获取(acquire)后即为拥有,own)同一个互斥锁。
    • 如果一个锁没有被获取(acquire),那么获取它将获得他的所有权。如果一个锁已经被其他线程获取,那么获取它将会阻塞当前线程。
    • 释放(release)一个锁则代表放弃对其的所有权,释放锁后其他尝试获取该锁被阻塞的线程会获得该锁,从而解除阻塞,开始运行。
  • 互斥锁本身并不做任何事情,我们使用互斥锁来表示操作某种资源(比如一个对象、一些数据、或者IO设备)的权利。
  • 为了最小化线程因为获取不到锁而被阻塞的可能,应该最小化地使用加锁解锁,仅在即将要使用时获取锁,并在不使用后的第一时间释放。
  • 使用锁保护的一段代码区域称为临界区(critical section)。
  • 标准库提供的互斥锁是排他性拥有语义的,也就是只有一个线程能够排他地拥有该锁。实践中除了这种锁,还有许多其他种类的锁,比如允许多个读线程一个写线程的互斥锁,标准库没有提供这种锁。如果要用,可以使用操作系统提供的或者自己写。

std::mutexstd::recursive_mutex

  • 类型:native_handle_type,实现定义的底层句柄类型。native_handle接口用于获取该句柄。
  • 加锁解锁:
    • lock:获取锁,未获取到会阻塞当前线程,获取到则继续执行。
    • try_lock:尝试获取锁,是否获取到都会直接返回,未获取到则返回false,获取成功则返回true
    • unlock:解锁。
  • 互斥锁不能被拷贝或者移动,将互斥锁看做一种资源,而不是一种资源的句柄。事实上,mutex通常被实现为操作系统资源的句柄,但因为这种操作系统资源不能被分享、拷贝、释放、移动,所以上层的互斥锁也不能这样做。
  • try_lock一般用于一个线程获取不到资源也有其他事情可做的情况。比如它有一个任务队列,其中一个任务没有获取到锁可以暂停执行然后执行下一个任务。
  • 当使用锁时,就需要考虑死锁问题(deadlock):一个线程在等待一个永远也不会被释放的锁。最简单的死锁场景就是递归获取std::mutex,获取已经被当前线程获取到的锁会永远阻塞当前线程,这种情况改用std::recursive_mutex就能避免,std::recursive_mutex允许一个线程多次获取和释放一个锁,获取多少次就需要释放多少次,第一次获取就会获取到该锁,直到最后一个释放才会真正释放该锁。

std::timed_mutexstd::recrusive_timed_mutex

  • 除了提供std::mutexstd::recursive_mutex的所有操作外,还提供两个接口:
    • try_lock_for(d):尝试获取一个锁一段时间,期间获取到返回true,到时候后获取不到返回false
    • try_lock_until(tp):尝试获取一个所到一个时间点。
    • 如果上述的d小于等于0,或者tp在当前时间点之前,那么就相当于朴素的try_lock
  • std::recursive_timed_mutex允许递归锁定。

互斥锁的RAII类:

  • 因为锁可以看做一个资源,锁定之后必须要被释放,所以可以使用RAII来管理,标准库提供了RAII类。
  • std::lock_guard:最简单、最小、最快的RAII类,构造时获取锁,析构时释放锁,没有任何其他功能。
    • 除了朴素的构造和析构还有一个构造:lock_guard( mutex_type& m, std::adopt_lock_t t );,传入第二个参数adopt_lock构造,此时会假定当前线程已经获取了锁,则构造时不会获取锁,只会在析构时释放,如果当前线程没有获取到该锁,那么是UB。
  • std::unique_lock:可以移动的RAII类,构造时获取锁,析构时释放锁,可以通过移动构造和移动赋值移动锁的锁资源的管理权。
    • 可以默认构造,不持有任何锁。
    • explicit unique_lock( mutex_type& m );:构造时获取锁。
    • unique_lock( mutex_type& m, std::defer_lock_t t ) noexcept;:持有但是不获取锁。
    • unique_lock( mutex_type& m, std::try_to_lock_t t );:构造时调用try_lock尝试获取锁,获取到则持有,未获取到则不持有。
    • unique_lock( mutex_type& m, std::adopt_lock_t t );:持有互斥锁,但是不在构造时获取,假定当前线程已经获取了锁。
    • unique_lock lck(m, tp):传入一个时间点,对互斥锁调用try_lock_until(tp),获取到才持有。
    • unique_lock lck(m, d):传入时间段,对互斥锁调用try_lock_for(d),获取到才持有。
    • 析构:如果持有锁,则释放它。
    • 移动构造、移动赋值、交换操作略。
    • 提供显式的lock try_lock try_lock_for try_lock_until unlock操作。
    • release在不释放锁的情况下释放其拥有权。
    • mutex:获取其中保存的互斥锁的指针。
    • owns_lock:测试是否持有一个锁。
    • operator bool:测试是否持有一个锁。
    • 只有持有timed_mutex recursive_timed_mutex时才允许调用时间相关操作。
  • std::scoped_lock:C++17引入,只有简单的构造与析构操作,目的是同时持有多个锁,有一个未获取到都释放所有获取到的锁,要么就全获取到,要么就全都不获取。
    • 有一个接受std::adopt_lock_t的构造:scoped_lock( std::adopt_lock_t, MutexTypes&... m );
  • 应该最小化锁的获取间隔(即临界区),所以引入RAII类之后,可能需要显式加入一个新的块来控制RAII对象的生命周期,而不是让其一直自然持续到大的块结束。
  • 任何可锁定对象(定义了lock unlock)都可以使用std::lock_guard管理,不仅仅是标准库中的几个互斥锁。

相关类型与常量:

  • 定义了几个类型在RAII类构造时用来标识持有锁的策略:
    • std::defer_lock_t:持有但不获取锁。
    • std::try_to_lock_t:调用try_lock尝试获取锁,获取成功才持有。
    • std::adopt_lock_t:假定锁已经被获取,构造时持有但不获取锁。
  • 每个类型都定义了一个常量,直接用这几个常量作为参数传递就行:
    • std::defer_lock std::try_to_lock std::adopt_lock

多个锁:

  • 很多时候需要同时获取多个锁,但是可能会由于执行顺序的不确定导致死锁,所以这种情况就需要一种commit or rollback操作,要么获取所有锁,要么一个都不获取。避免两个线程互相持有一个对方想获取的锁导致死锁。
  • std::try_lock(locks...):尝试获取多个锁,返回-1表示成功,否则返回一个基于0的获取失败的锁的索引。
  • std::lock(locks...):获取多个锁,获取不到则会释放已经获取到的,并且阻塞当前线程。std::scoped_lock是这个函数的RAII类。
  • 注意直接对互斥锁而不是std::unique_lock使用std::lock会需要程序员显式释放每个锁。
  • 可以使用std::defer_lock构造std::unique_lock,然后使用std::lock获取多个锁,然后依赖于std::unique_lock析构释放这些锁。

call_once

  • std::once_flag std::call_once提供了一种避免数据竞争的变量初始化机制,可以用于静态变量的初始化。编译器可能就是使用类似机制初始化局部静态变量的。
  • std::once_flag fl{};:默认构造,此时fl还没有被使用过。
  • std::call_once(fl, f, args):如果fl还没有被使用过则调用f(args),调用之后fl则变为使用过。
  • 例子:可以用这个机制实现其他静态变量:比如全局变量、类的静态变量的第一次使用前初始化,就像局部静态变量一样。从而得到确定的初始化控制、并且避免数据竞争(什么时候这种变量的初始化会出现数据竞争?)。
class X
{
private:
    static std::once_flag static_flag;
    static Y static_data_for_class_X;
    static void init()
    {
        // do initialization of static_data_for_class_X
    }
public:
    X()
    {
        call_once(static_flag, init);
    }
};
  • 这种初始化其实是先默认初始化一次之后,再在第一次构造X时调用初始化函数进行仅一次的初始化。

<shared_mutex>

  • C++14和C++17还引入了头文件<shared_mutex>,其中包括:
  • C++17引入了std::shared_mutex:提供两个层次的访问控制:
    • 同普通互斥锁一样的只能单个线程拥有该锁:lock try_lock unlock
    • 在此基础之上提供多个线程可以共同拥有该锁的机制:lock_shard try_lock_shared unlock_shared
  • C++14引入的std::shared_timed_mutexstd::shared_mutex基础上提供:try_lock_for try_lock_until try_lock_shared_for try_lock_shard_until
  • C++14引入的std::shared_lock作为上面的共享互斥锁的RAII类,提供和std::unique_lock完全相同的接口与功能,只可移动不可拷贝。

条件变量

条件变量:

  • 条件变量被用来管理线程之间的通信。
  • 一个线程可以(阻塞起来)在一个条件变量上等待某一事件的到来,比如到达某一特定时间或者某个其他线程完成等,使用条件变量可以减少数据竞争。
  • 条件变量的工作模式:
    • 等待通知方:先获取到用来保护共享变量的锁,调用等待接口等待事件到来(在这之前最好先检查事件是否已经到来如果到来就可以不用等待了),等待期间会自动(由等待接口来)阻塞并释放锁拥有权,知道事件完成后(或者等待超时,或者假醒)自动重新获取锁,检查条件是否得到满足。
    • 通知方:获取锁,拥有锁的期间修改共享变量,调用notify_one notify_all通知所有在等待这个条件变量的线程。

首先状态类型std::cv_status

enum class cv_status {
    no_timeout,
    timeout  
};
  • 作为条件变量按时间等待的返回值,no_timeout表示按时间等待等到了通知,timeout表示超时了没有等到通知。

std::condition_variable

  • 首先:std::condition_variable仅与std::unique_lock配合使用。
  • 下列的lock只能是std::unique_lock<std::mutex>
  • 默认构造、析构比较简单,略,不可拷贝。
  • 同样有native_handle_type类型和native_handle接口。
  • 通知接口:
    • nofity_one:通知其中一个等待的线程(如果有)。
    • notify_all:通知所有正在等待的线程。
  • 等待接口:
    • wait(lock):自动释放lock,阻塞当前线程,添加当前线程到等待*this的列表中。当notify_one notify_all执行时当前线程会被唤醒,也可能被虚假地唤醒(也就是没有接到通知但是被唤醒了)。无论任何原因,唤醒之后都会重新获取锁lock
    • wait(lock, pred):等价于:
      while (!pred())
      {
          wait(lock);
      }
      • 也即是会确保谓词满足,不然就一直等待,可以过滤掉假醒(spurious awaken)的情况。
      • 注意这个锁在进入之前必须是锁住的,然后进入wait之后才会被锁住,也就是说这个锁可以在pred中用于保护相关共享变量的访问。
    • 如果传入的lock不拥有互斥锁或者没有当前线程获取,那么会违反前置条件,会调用std::terminate终止程序。
    • x = cv.wait_until(lock, tp)同理,不过只等待到某一时间点,如果超时返回std::cv_status::timeout,没有超时等到了事件则返回std::no_timeout
    • b = cv.wait_until(lock, tp, pred)返回布尔值,等价于:
    while (!pred())
    {
        if (wait_until(lock, tp) == std::cv_status::timeout)
        {
            return pred();
        }
    }
    return true;
    • x = cv.wait_for(lock, d):等价于x = cv.wait_until(lock, std::steady_lock::now() + d)
    • x = cv.wait_for(lock, d, pred):等价于x = cv.wait_until(lock, std::steady_lock::now() + d, std::move(pred))
  • 一个条件变量可能依赖也有可能不依赖操作系统资源,如果依赖且已经没有这种资源,那么构造会抛出异常。
  • 就像mutex一样,condition_variable不能被复制或者拷贝,所以最好将其视为一种资源,而不是一种资源句柄。
  • condition_variable析构时需要确保没有线程等待在该条件变量上,否则他们将永远等待下去。
  • wait是一个底层操作,因为实现原因,标准允许该接口出现假醒以简化实现复杂度,也就是在没有得到通知的情况下返回。要使用朴素的wait的话应该在循环中使用,比如:
while (queue.empty())
{
    wait(queue_lck);
}
  • 带谓词版本就是做这个事情,所以最好优先选择带谓词版本。

实践细节:

  • notify_one还是notify_all的选择,取决于应用程序。比如在一个生产者消费者模型中,如果已有未消费数据太多,则可以使用notify_all加大吞吐量,如果数据队列中数据保有量比较小,则可以选择notify_one避免频繁唤醒多个线程去消费一个接近空的数据队列。具体选什么需要根据场景来,如果要确定一个参数作为notify_one notify_all的划分依据,最好进行相关性能测试找一个最优值,而不是凭直觉。
  • 实践中(比如生产者消费者模型)通常会有在未等到条件变量的情况下也可以做一些其他事情,而不是单纯阻塞线程的情况。这时候可以使用等待一定时间的接口(带谓词版本),并在具体接口中返回bool表示是否等到了条件变量/是否取得数据等,根据此结果决定下一步是处理数据还是做其他事情。要这样做就要能够重新等待条件变量,通常要在一个循环中才有意义。

std::condition_variable_any

  • 相对于只能在std::unique_lock<std::mutex>上工作的std::condition_variable(对其有优化,所以能用std::condition_variable就用std::condition_variable)。std::condition_variable_any可以在任何基本可锁定对象上工作(也就是具有lock unlock接口的类型)。
  • 接口基本完全一致,一个用法。

std::notify_all_at_thread_exit(std::condition_variable& cond, std::unique_lock<std::mutex> lk)

  • 提供机制,在线程退出时通知等待某个条件变量的所有线程。

任务

基于任务的并发:

  • 前面提供的并发机制使我们专注于线程、避免数据竞争、线程同步等机制而非并发的任务本身。
  • 标准库提供更高抽象的并发机制,基于任务的并发机制:这里的任务是给定参数然后产生一个结果的任务。
  • 相关类型:packaged_task<F> promise<T> future<T> shared_future<T>
  • 相关函数:x = async(policy, f, args...) x = async(f, args...)
  • 基于任务的并发机制相比基于线程和锁的并发更简单易用,屏蔽了底层的细节,比如数据竞争、假醒、过多的等待等。将精力从复杂的并发机制下节省下来做其他事情。
  • 可以通过这样的机制执行并发任务:
auto handle = std::async(task, args);
// do something in main thread
res = handle.get(); // get result of task

std::futurestd::promise

  • 一个任务将值放进std::promise,通过set_value set_exception
  • 从任务中取出值则对其对应的std::future调用get
  • 其中所说的值就是线程之间的共享状态
  • std::future中保存了值或者异常,包含了两个线程之间用于安全交换信息的所有数据。
  • 其中的共享状态(shared state)至少包含了如下信息:
    • 一个恰当类型的值或者一个异常,如果是void类型的std::future,则没有任何值。
    • 一个位表示一个值或者异常是否已经准备好,可以从std::future中提取。
    • 当调用get时要执行的任务,如果这个std::future是通过std::async传入defered策略得到的话。
    • 一个引用计数,当引用计数归零时析构。
    • 一些互斥锁数据,用来唤醒任何在等待(比如等待条件变量的)的线程。
  • 一个实现可以对共享状态进行的操作:
    • 构造:可能会使用用户定义的分配器。
    • 使其准备好:设置ready位并且唤醒等待的线程。
    • 释放:递减引用计数,当引用计数归零时释放。
    • 抛弃:值或者异常不能被放入std::promise时,抛出std::future_error带错误状态broken_promise,并且置ready位。
  • promise/future作为一套线程间的异步通信设施,std::promise在任务线程中设置值,是PUSH端,std::future在发起这个异步任务的线程中获取值,是PULL端。
  • std::promise中的设置(set_value/set_exception/...)和其对应的std::furue::get获取值之间是有同步的,不需要担心数据竞争问题。但是多个std::shared_future::get之间要么全部是只读的,要么需要有外部同步设施。

std::promise

  • 提供存储一个值或者异常的实施让其他线程异步地通过std::promise对象提供的std::future对象来获取。
  • std::promise是共享状态的句柄,可以通过std::promise向共享状态中存入任务任务运行结果,然后通过std::future来获取。
  • 类型:
    • 主模板std::promise<T>
    • 特化std::promise<R&>
    • 特化std::promise<void>
  • 构造:
    • 默认构造一个未准备好的共享状态。
    • promise(allocator_arg, a),指定分配器默认构造。
    • 可移动构造,不可拷贝构造。
  • 析构:如果共享状态就绪,则释放,未就绪则会抛出std::future_error带错误状态broken_promise
  • 可移动赋值与交换(成员与非成员)。
  • 获取结果:
    • get_future:获取到关联的std::future。仅能调用一次,第二次调用会抛出异常,要分享这个future的话可以使用std::future::share
  • 设置结果:
    • set_value:原子的设置值(左值右值const左值)到共享状态,空参数版本为std::promise<void>特化仅仅使状态就绪。
    • set_value_at_thread_exit:原子的存储value到共享状态,但不立即就绪状态,等线程退出后,销毁所有线程局部对象后,再令状态就绪。
    • set_exception:存储异常指针到共享状态,并令状态就绪。
    • set_exception_at_thrad_exit:存储异常指针到共享状态,但不立即就绪状态。
    • 这四个函数之间有互斥锁用于同步。
    • 重复设置值或者异常会抛异常。
    • 在设置值和获取值时可以使用移动操作,减少拷贝开销。
  • 说明:
    • 直接使用std::promise的方法通常是,在当前线程中构造std::promise并获取std::future,创建std::thread并将std::promise移动到工作线程(作为参数,移动过去的话那么就要值传递。是否也可以引用传递,并要求当前线程后续不能使用这个std::promise?)。
    • 在当前线程中通过future对象的get/wait接口等待工作线程完成任务,并获取结果。

std::packaged_task

  • 异步任务的包装类,以便能够在另一个线程中异步调用任务,持有一个任务以及一个promise/future对。
  • 必须在另一个线程中异步调用才能异步,直接调用会在当前线程直接运行。
  • std::packaged_task以一个任务作为参数(函数以及传递给函数的参数),通过std::promiseset_value/set_exception/...等接口将任务的运行结果(函数的返回值或者抛出的异常)保存到其持有的std::promise中。
  • std::packaged_task类似于下列方式执行任务:
try
{
    pr.set_value(f(args));
}
catch(...)
{
    pr.set_exception(std::current_exception());
}
  • 类似于std::function,它的有效模板参数是函数类型std::packaged_task<R<Args...>>(仅对函数类型做偏特化)。
  • 构造:
    • std::packaged_task pt{}默认构造没有任务也不持有共享状态。
    • std::packaged_task pt(f):使用可调用对象f的拷贝作为其任务(可以有参数),构造其中的共享状态,使用默认分配器。需要Args...对函数f能够调用才是合法的。
    • std::packaged_task pt(std::allocator_arg, a, f):指定分配器构造。
    • 有移动构造,无拷贝构造。
  • 析构:释放共享状态、销毁存储的任务对象。其中的std::promise也会析构,也就是说析构时共享状态应该是就绪的,否则会抛出异常。
  • 有效性检验:valid()检查是否有共享状态。
  • 交换:swap交换共享状态以及任务。
  • 获取结果:
    • get_future,返回关联到其中的std::promisestd::future对象。同上,只能调用一次。没有共享状态或者多次调用会抛异常。
  • 执行:
    • operator()(args...):执行其中保存的任务,将返回值或者抛出的异常保存到共享状态中,使共享状态就绪,解除每个等待共享状态的线程的阻塞。如果已经被调用或者没有共享状态,则会抛出异常。
    • make_ready_at_thread_exit(args...):调用其中保存的任务,将返回值或者抛出的异常保存到共享状态中,但在线程退出时才使共享状态就绪。毫无疑问它是调用std::promiseset_value_at_thread_exit/set_exception_at_thread_exit完成任务的。
    • reset():重置共享状态,抛弃前一次运行的结果,重新创建新的共享状态。等价于*this = std::packaged_task(std::move(f))
  • 说明:
    • 抛弃共享状态(析构或者被移动)之前首先要让其就绪。
    • make_ready_at_thread_exit的优势在于某些时候运行结果需要thread_local变量析构完成之后结果才可用,那么就必须用这个接口。
    • std::promise对象的处理完全由std::packaged_task对象负责,外部对此无感知,也无法获取。所以没有一个get_promise对象。
    • std::packaged_task包装了任务,使我们感知不到std::thread的存在,但是它确实是异步执行的。我们可以用类似于普通函数调用的方式来使用它。
    • 可以将std::packaged_task用以保存或者移动,并在当前线程中合适地调度他们的运行顺序,就像普通函数那样执行即可,最终通过std::future来获取结果。甚至比普通函数使用起来更简单,因为异常处理已经被考虑,不需要再去考虑执行多个任务时怎么处理异常。
  • 标准用法:
std::packged_task<void(int)> task(f);
std::future fut = task.get_future();
std::jthread t(std::move(task), 0); // call it asynchronously explicitly!

std::future

  • std::future是用来从共享状态中拉取结果的PULL端。
  • 构造:
    • 默认构造:不关联都任何共享状态,valid() == false
    • 可移动构造,不可拷贝构造。
  • 析构:
    • 如果当前对象是持有共享状态的最后一个引用,那么共享状态被销毁。并且当前对象放弃对共享状态的引用。
    • 这些操作不会阻塞以等待共享状态就绪,除非以下条件全为真(也就是说以下情况则会阻塞):
      • 共享状态是std::async创建的,也就是std::futurestd::async的返回结果。
      • 共享状态没有就绪,并且这是共享状态的最后一个引用。
    • 实践中来说,只有使用std::asyncstd::launch::async启动策略调用(无论运行时选择这个策略或者显式传入策略)才会阻塞。
    • 更准确地说:
      • 对于std::async未推迟启动的最后一个引用到共享状态的std::future析构时,会隐式执行join
      • 其他未推迟任务的最后一个引用析构时则是隐式detach
      • std::async的推迟启动的最后一个引用到共享状态的std::future析构时,则是直接什么都不做,任务直接被抛弃了。
      • 非最后一个引用则仅仅是递减引用计数。
  • 可移动赋值不可拷贝赋值。
  • share:将对共享状态从*this转移到一个shared_future并返回。std::shared_future是多个对象引用同一个共享状态的类型,而std::future则不可。执行之后,valid() == false
  • 获取结果:
    • get():默认按值返回,对于std::future<T&>特化则是返回引用,对std::future<void>特化则是返回void
    • 阻塞并等待任务运行完成,得到结果。
    • 按值返回会默认移动,如果结果是一个异常,那么会将异常在这个接口中原样抛出。
    • 需要在有效状态下调用,执行之后valid()变为false
    • 会高效地调用wait以等待结果返回。
  • 状态:
    • valid():表示当前共享状态是否有效。没有共享状态(默认构造、被移动走了、被转移到了std::shared_state)或者已经通过get获取结果则返回false
    • wait():阻塞当前线程直到结果可用。
    • wait_for(d):阻塞当前线程等待一段时间,返回一个std::future_status
    • wait_until(tp):阻塞当前线程等待到某个时间点,返回一个std::future_status
  • std::future_status
    • 表示共享状态的当前状态:
    enum class future_status {
        ready,
        timeout,
        deferred
    };
    • ready表示已经就绪。
    • timeout表示等待超时,即还没有就绪。
    • deferred共享状态包含了一个延迟求值的函数,只有在显式调用std::future::get时才会求值,会在使用std::launch::deferred策略调用std::async时遇到。

std::shared_future

  • 可默认构造、可拷贝构造、可移动构造、可从std::future移动构造。
  • 其余接口完全同std::future
  • 如果想被多个读取方读取结果或者需要读取多次,可以用shared_future
  • 对于std::shared_future<T>::get返回的是const T&就不再是移动按值返回了,因为可能会多次读取。对于std::shared_future<T&>::get同样返回引用,同时读取则需要一些同步机制。

std::async

  • 相比std::packaged_task提供了稍微更高一点的任务抽象,使用它则将是为任务创建一个新线程、回收并利用旧线程、还是在当前线程执行的决策都交给了thread launcher(也就是std::async)。
  • fu = std::async(policy, f, args...)使用特定策略启动任务。
  • fu = std::async(f, args...)等价于fu = std::async(std::lanunch::async | std::launch::deferred, f, args...),这会让编译器。
  • 启动策略:
enum class launch : /* unspecified */ {
    async =    /* unspecified */,
    deferred = /* unspecified */,
    /* implementation-defined */
};
  • async表示一定异步启动,deferred则表示推迟启动。默认情况或者显式传入std::lanunch::async | std::launch::deferred则表示由编译器和运行时系统来选择。
  • 使用std::derferred将会串行执行,在调试、测试、判断问题是否是由并发引起的,这个策略很有用。通过启动策略的切换来测试是调试并行程序的一个有用方法。
  • 默认的启动策略可能会导致一些问题,如果你遇到了这样的问题,那么可以显式指定启动策略。

例子

  • 实现一个并行的find算法,见ParallelFind.cpp
  • 在某些任务里面,并行算法是有意义的,不过一般来说只在计算量很大,节省的计算开销足以覆盖线程创建开销的情况下。对于计算量比较小的场景,是没有必要的,线程创建是有开销的。