C++11 并发多线程

齐琴伊察 金字塔 -- 墨西哥

一、线程启动、结束,创建线程的多种方法、join,detach

  • thread
  • join() , detach() , joinable()
  • 用类,lambda 表达式创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <iostream>
#include <thread>

using namespace std;

void myPrint() {
cout << "我的线程开始执行...." << endl;
// .....
cout << "我的线程运行完毕...." << endl;
return;
}

class Ta{
public:
// 重载函数运算符
void operator()() {
cout << "我的线程开始运行..." << endl;
// ....
cout << "我的线程运行完毕..." << endl;
}
// 成员函数:
void getMsg(int input){};
void saveMsg(){};
};

int main() {
// (1) 创建线程,myPrint为线程入口
// 线程类参数是一个可调用对象
// 可以是函数,函数指针,lambda表达式,bind创建的对象或者重载函数调用运算符的类对象

// Ⅰ 以函数对象创建线程
thread myThread(myPrint);

// Ⅱ 以类对象创建线程
Ta ta;
thread myThread(ta);

// Ⅲ 以lambda表达式创建线程
auto lambdaThread = [] {
cout << "我的线程开始了..." << endl;
// ...
cout << "我的线程结束了..." << endl;
}
thread myThread(lambdaThread);

// Ⅳ 以类中成员函数
// 第一个 & 取地址,第二个 & 引用,相当于 std::ref(s)
// threadObj1(&Ta::saveMsg, ta) 传值也是可以的,但是会调用一次构造函数,两次拷贝构造
// threadObj1(&Ta::saveMsg, &ta) 传引用,会调用一次构造函数,一次拷贝构造
Ta ta;
int input = 0;
thread threadObj1(&Ta::saveMsg, &ta);
thread threadObj2(&Ta::getMsg, &ta, input);

// (2) 阻塞主线程并等待 myThread 线程执行完毕
myThread.join();

// (3) 不阻塞主线程,主线程往下执行,如果主线程退出,子线程还未结束
// 将会由后台托管执行。
myThread.detach();

// (4) join , detach 只能使用一个,可以使用 joinable 来进行辅助判断
// 注:上述只是举例写法,真实代码环境,join,detach只能使用一个
if (myThread.joinable()) {
cout << "可以调用join(),或者detach()" << endl;
} else {
cout << "不能调用join(), 或者detach()" << endl;
}

// 主线程打印语句
cout << "Hello World!" << endl;
return 0;
}

二、线程传参详解,detach()再探,类成员函数作为线程入口函数

  • 传递临时对象作为线程参数,要避免的陷阱
  • 临时对象作为线程参数,线程id,临时对象构造时机抓捕
  • 传递类对象、只能指针作为线程参数
  • 用成员函数指针做线程入口函数

2.1 可能的陷阱

传递临时对象作为线程参数 要避免的陷阱

陷阱一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
using namespace std;

void myPrint(const int& i, char* pmybuf) {
// 如果线程从主线程 detach 了
// i 不是 mvar 真正的引用,实际上值传递,即使主线程运行完毕了,
// 子线程用 i 仍然是安全的,但仍不推荐传递引用
// 推荐改为 const int i
cout << i << endl;
// pmybuf还是指向原来的字符串,所以这么写是不安全的
cout << pmybuf << endl;
}

int main()
{
int mvar = 1;
int& mvary = mvar;
char mybuf[] = "this is a test";
// 第一个参数是函数名,后两个参数是函数的参数
thread myThread(myPrint, mvar, mybuf);
myThread.join();
// myThread.detach();

cout << "Hello World!" << endl;
}

陷阱二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
#include <string>
using namespace std;

void myPrint(const int i, const string& pmybuf) {
cout << i << endl;
cout << pmybuf << endl;
}

int main() {
int mvar = 1;
int& mvary = mvar;
char mybuf[] = "this is a test";
// 如果 detach 了,这样仍然是不安全的
// 因为存在主线程运行完了,mybuf被回收了,系统采用 mybuf 隐式类型转换成 string
// 可能还没转化,主线程就已经结束了
thread myThread(myPrint, mvar, mybuf);

// 推荐先创建一个临时对象,就绝对安全了。。。。
thread myThread(myPrint, mvar, string(mybuf));

myThread.join();
//myThread.detach();

cout << "Hello World!" << endl;

总结

  • 如果传递简单类型,如内置类型,推荐使用值传递,不要用引用
  • 如果传递类对象避免使用隐式类型转换,全部都是创建线程这一行就创建出临时对象,然后在函数参数里,用引用来接,否则还会创建出一个对象
  • 建议不使用 detach

2.2 线程 id

线程 Id 通过如下获取:

1
std::this_thread::get_id();

2.3 传递 类对象智能指针 作为线程参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <thread>
using namespace std;

class A {
public:
mutable int m_i; // m_i即使是在 const 中也可以被修改
A(int i) :m_i(i) {}
};

void myPrint(const A& pmybuf) {
pmybuf.m_i = 199;
cout << "子线程myPrint的参数地址是" << &pmybuf << "thread = "
<< std::this_thread::get_id() << endl;
}

int main() {
A myObj(10);
// myPrint(const A& pmybuf)中引用不能去掉,如果去掉会多创建一个对象
// const也不能去掉,去掉会出错

// 即使是传递的const引用,但在子线程中还是会调用拷贝构造函数构造一个新的对象,
// 所以在子线程中修改m_i的值不会影响到主线程

// 如果希望子线程中修改m_i的值影响到主线程,
// 可以用thread myThread(myPrint, std::ref(myObj));
// 这样const就是真的引用了,myPrint定义中的const可以去掉了,类A定义中的mutable也可以去掉了
thread myThread(myPrint, myObj);
myThread.join();
//myThread.detach();

cout << "Hello World!" << endl;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>
#include <memory>
using namespace std;

void myPrint(unique_ptr<int> ptn) {
cout << "thread = " << std::this_thread::get_id() << endl;
}

int main() {
unique_ptr<int> up(new int(10));
//独占式指针只能通过std::move()才可以传递给另一个指针
//传递后 up 就指向空,新的 ptn 指向原来的内存
//所以这时就不能用 detach 了,因为如果主线程先执行完,ptn指向的对象就被释放了
thread myThread(myPrint, std::move(up));
myThread.join();
//myThread.detach();
return 0;
}

三、创建多个进程、数据共享问题分析

3.1 创建和等待多个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TextThread() {
cout << "我是线程" << this_thread::get_id() << endl;
/* … */
cout << "线程" << this_thread::get_id() << "执行结束" << endl;
}

//main函数里 vector threadagg;
vector<thread> threadagg;
for (int i = 0; i < 10; ++i) {
threadagg.push_back(thread(TextThread));
}
for (int i = 0; i < 10; ++i) {
threadagg[i].join();
}

3.2 数据共享问题分析

  • 只读数据是安全稳定的
  • 如果存在写操作,若不加处理就会出错

四、互斥量概念、用法、死锁

  • 互斥量(mutex)的基本概念
  • 互斥量的用法: lock() , unlock() , std::lock_guard 类模板
  • 死锁及一般解决方案, std::lock() 函数模板, std::lock_guardstd::adopt_lock 参数

4.1 互斥量的基本概念及用法

互斥量就是个类对象,可以理解为一把锁,多个线程尝试用 lock() 成员函数来加锁,只有一个线程能锁定成功,如果没有锁成功,那么流程将卡在 lock() 这里不断尝试去锁定。互斥量使用要小心,保护数据不多也不少,少了达不到效果,多了影响效率。

包含 #include<mutex> 头文件,成员函数有, lock()unlock() 。基本使用步骤:

  • lock()
  • 操作共享数据
  • unlock()

lock_guard类模板

1
2
3
lock_guard sbguard(myMutex);  // 取代lock() 和 unlock()
// lock_guard构造函数执行了mutex::lock();
// 在作用域结束时,调用析构函数,执行mutex::unlock()

4.2 死锁

两个以上互斥量使用,相互竞争,都在争锁,不释放已有锁。主要解决办法是保证多个互斥量上锁的顺序合理。

std::lock() 函数模板

  • std::lock(mutex1, mutex2,...); 一次性锁定多个互斥量,用于处理多个互斥量
  • 如果互斥量中一个没锁住,他就等着,等所有互斥量锁住,才能继续执行。如果有一个没锁住,就会把已经锁住的释放掉(要么互斥量都锁住,要么都没锁住,防止死锁)

**std::lock_guard ** 中 std::adopt_lock 参数

1
2
3
// 加入adopt_lock后,在调用lock_guard的构造函数时,不再进行lock();
std::lock_guard<std::mutex> my_guard(my_mutex, std::adopt_lock);
// adopt_guard为结构体对象,起一个标记作用,表示这个互斥量已经lock(),不需要在lock()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
using namespace std;

class A{
public:
void inMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
cout << "插入一个元素" << i << endl;
//lock_guard<mutex> sbguard(myMutex1, adopt_lock);
lock(myMutex1, myMutex2);
//myMutex2.lock();
//myMutex1.lock();
msgRecvQueue.push_back(i);
myMutex1.unlock();
myMutex2.unlock();
}
}
bool outMsgLULProc() {
myMutex1.lock();
myMutex2.lock();
if (!msgRecvQueue.empty()) {
cout << "删除元素" << msgRecvQueue.front() << endl;
msgRecvQueue.pop_front();
myMutex2.unlock();
myMutex1.unlock();
return true;
}
myMutex2.unlock();
myMutex1.unlock();
return false;
}

void outMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
if (outMsgLULProc()) {}
else {
cout << "数组为空" << endl;
}
}
}
private:
list<int> msgRecvQueue;
mutex myMutex1;
mutex myMutex2;
};

int main() {
A myobja;
mutex myMutex;
thread myOutMsgObj(&A::outMsgRecvQueue, &myobja);
thread myInMsgObj(&A::inMsgRecvQueue, &myobja);
myOutMsgObj.join();
myInMsgObj.join();
return 0;
}

五、unique_lock 模板类

  • unique_lock 取代 lock_guard (优点:灵活,缺点:效率降低)
  • unique_lock 的第二个参数: std::adopt_lock,std::try_to_lock,std::defer_lock
  • unique_lock 的成员函数: lock(),unlock(),try_lock(),release()
  • unique_lock 所有权的传递

5.1 unique_lock 的第二个参数:

std::adopt_lock

  • 表示这个互斥量已经被 lock() ,即不需要在构造函数中 lock 这个互斥量
  • 前提必须提前locklock_guard 也可以用这个参数

std::try_to_lock

  • 尝试用 mutex 的 lock() 去锁定这个 mutex,如果没有成功,会立即返回,不会阻塞。
  • 使用 try_to_lock() 的原因是:防止其他的线程锁定的 mutex 太长时间,导致本线程一直阻塞在 lock 这个地方
  • 前提不能提前 lock()
  • owns_lock() 方法判断是否拿到锁,如果拿到返回 true

std::defer_lock

  • 如果没有第二个参数就对 mutex 加锁,加上 defer_lock 是初始化一个没有加锁的mutex
  • 不给他加锁的目的是以后可以调用 unique_lock 的一些方法
  • 前提不能提前 lock()

unique_lock的成员函数(前三个与 std::defer_lock 联合使用)

lock() :加锁

1
2
unique_lock<mutex> myUniLock(myMutex, defer_lock); // 本质是所有权的转移
myUniLock.lock(); // 不用自己 unlock

unlock() :解锁

1
2
3
4
5
6
7
8
unique_lock<mutex> myUniLock(myMutex, defer_lock);
myUniLock.lock();
// .... 处理一些共享代码
myUniLock.unlock();
// .... 处理一些非共享代码
// .... 释放锁给其他线程,防止其他线程过长时间等待
myUniLock.lock();
// .... 处理一些共享代码

try_lock() :尝试给互斥量加锁,如果拿不到返回 false,否则返回 true。

release() :

1
2
unique_lock<mutex> myUniLock(myMutex); // 相当于把myMutex 和 myUniLock绑定在一起,release()就是接触绑定,返回它所管理的mutex对象的指针,并释放所有权
mutex* ptr = myUniLock.release(); // 由原来的mutex对象处理加锁状态,就需要ptr在以后进行解锁,例如 ptr->unlock();

注意:

  • lock 的代码段越少,执行越快,整个程序的运行效率越高
    • 锁住的代码少 -> 细粒度 -> 执行效率高
    • 锁住的代码多 -> 粗粒度 -> 执行效率低

unique_lock所有权的传递

  • 使用 move 转移
1
2
3
// 只能通过移动语句转移所有权,不能使用拷贝
unique_lock<mutex> myUniLock1(myMutex);
unique_lock<mutex> myUniLock2(std::move(myUniLock1));
  • return 一个临时变量,即可实现转移
1
2
3
4
5
unique_lock<mutex> aFuc() {
unique_lock<mutex> myUniLock(myMutex);
// 系统会生成临时的 unique_lock 对象调用它的移动构造函数。
return myUniLock;
}

六、单例设计模式共享数据问题分析、解决,call_once

  • 单例设计模式:项目中,有某个或者某些特殊的类,只能创建一个属于该类的对象。
  • 单例设计模式共享数据问题分析、解决
  • std::call_once()

6.1 单例模式共享数据问题分析、解决

面临问题:需要在自己创建的线程中来创建单例类的对象,这种线程可能不止一个,可能操作 GetInstance() 这种成员函数需要互斥。

技巧:可以在加锁前判断 m_instance 是否为空,否则每次调用 Singlton::getInstance() 都要进行加锁,十分影响效率。(双重锁定)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <iostream>	
#include <mutex>
using namespace std;

mutex myMutex;
// 懒汉模式,在初次使用的时候创建对象
class Singelton {
public:
static Singelton * getInstance() {
//双重锁定 提高效率
if (instance == NULL) {
lock_guard<mutex> myLockGua(myMutex);
if (instance == NULL) {
instance = new Singelton;
}
}
return instance;
}
private:
Singelton() {}
static Singelton *instance;
};
Singelton * Singelton::instance = NULL;

// 饿汉模式,在类外先创建对象
class Singelton2 {
public:
static Singelton2* getInstance() {
return instance;
}
private:
Singelton2() {}
static Singelton2 * instance;
};
Singelton2 * Singelton2::instance = new Singelton2;

int main(void)
{
Singelton * singer = Singelton::getInstance();
Singelton * singer2 = Singelton::getInstance();
if (singer == singer2)
cout << "二者是同一个实例" << endl;
else
cout << "二者不是同一个实例" << endl;
cout << "---------- 以下是 饿汉模式 ------------" << endl;
Singelton2 * singer3 = Singelton2::getInstance();
Singelton2 * singer4 = Singelton2::getInstance();
if (singer3 == singer4)
cout << "二者是同一个实例" << endl;
else
cout << "二者不是同一个实例" << endl;
return 0;
}

如果觉得在单例模式中 new 了一个对象,而没有自己delete ,不合理,可以增加一个类中类 CGarHuiShounew 一个单例类时创建一个静态的 CGarhuishou 对象,这样在程序结束时会调用 CGarhuishou 的析构函数,释放掉 new 出来的单例对象。(技巧)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Singelton {
public:
static Singelton * getInstance() {
if (instance == NULL) {
// 定义一个静态对象,该静态对象的生命周期随程序结束才中断
static CGarhuishou huishou;
instance = new Singelton;
}
return instance;
}
class CGarhuishou {
public:
// 定义析构函数
~CGarhuishou() {
// 如果析构的时候内存还未释放,主动释放
if (Singelton::instance)
{
delete Singelton::instance;
Singelton::instance = NULL;
}
}
};
private:
Singelton() {}
static Singelton *instance;
};

6.2 std::call_once()

函数模板,该函数的第一个参数为标记,第二个参数是一个函数名(如a())。
功能:能够保证函数a()只被调用一次。具备互斥量的能力,而且比互斥量消耗的资源更少,更高效。
call_once() 需要与一个标记结合使用,这个标记为 std::once_flag ;其实 once_flag 是一个结构, call_once() 就是通过标记来决定函数是否执行,调用成功后,就把标记设置为一种已调用状态。

多个线程同时执行时,一个线程会等待另一个线程先执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::once_flag g_flag;
class Singelton {
public:
static void CreateInstance() {
instance = new Singelton;
}
// 两个线程同时执行到这里,其中一个线程要等另外一个线程执行完毕
static Singelton * getInstance() {
// call_once 保证其只被调用一次
call_once(g_flag, CreateInstance);
return instance;
}
private:
Singelton() {}
static Singelton *instance;
};
// 懒汉模式
Singelton * Singelton::instance = NULL;

七、condition_variable、wait、notify_one、notify_all

  • 条件变量 std::condition_variablewait()notify_one()notify_all

std::condition_variable 实际上是一个类,是一个和条件相关的类,就是等待一个条件成立。

1
2
3
4
5
6
7
std::mutex mymutex1;
std::unique_lock<std::mutex> sbguard1(mymutex1);
std::condition_variable condition;
condition.wait(sbguard1,
[this](){ if (!msgRecvQueue.empty()) return true;
else return false; });
condition.wait(sbguard1);

wait() 用来等一个东西

  • 如果第二个参数的lambda表达式返回值是 falsewait() 将解锁互斥量并阻塞到本行
  • 如果第二个参数的lambda表达式返回值是 truewait() 直接返回并继续执行

**阻塞到什么时候为止呢?**阻塞到其他某个线程调用 notify_one() 成员函数为止;

如果没有第二个参数,那么效果跟第二个参数lambda表达式返回 false 效果一样 wait() 将解锁互斥量,并阻塞到本行,阻塞到其他某个线程调用 notify_one() 成员函数为止。

当其他线程用 notify_one() 将本线程 wait() 唤醒后,这个 **wait()**恢复后
1、 wait() 不断尝试获取互斥量锁,如果获取不到,那么流程就卡在 wait() 这里等待获取;如果获取到了,那么 wait() 就继续执行,获取到了锁。

2、如果 wait() 有第二个参数就判断这个 lambda 表达式。

a) 如果表达式为 false那 wait 又对互斥量解锁,然后又休眠,等待再次被**notify_one()唤醒
b) 如果lambda表达式为 true,则 wait() 返回,流程可以继续执行(
此时互斥量已被锁住**),如果wait没有第二个参数,则 **wait()**返回,流程走下去。

流程只要走到了 wait() 下面则互斥量一定被锁住了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <thread>
#include <iostream>
#include <list>
#include <mutex>
using namespace std;

class A {
public:
void inMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
cout << "inMsgRecvQueue插入一个元素" << i << endl;
std::unique_lock<std::mutex> sbguard1(mymutex1);
msgRecvQueue.push_back(i);
// 尝试把 wait()线程唤醒,执行完这行,
// 那么 outMsgRecvQueue()里的 wait()就会被唤醒
// 只有当另外一个线程正在执行wait()时 notify_one() 才会起效,
// 否则没有作用
condition.notify_one();
}
}
void outMsgRecvQueue() {
int command = 0;
while (true) {
std::unique_lock<std::mutex> sbguard2(mymutex1);
// wait()用来等一个东西
// 如果第二个参数的lambda表达式返回值是false,
// 那么wait()将解锁互斥量,并阻塞到本行
// 阻塞到什么时候为止呢?
// 阻塞到其他某个线程调用notify_one()成员函数为止;
// 当 wait() 被 notify_one()激活时,
// 会先执行它的条件判断表达式是否为 true,
// 如果为true才会继续往下执行
condition.wait(sbguard2, [this] {
if (!msgRecvQueue.empty())
return true;
return false;});
command = msgRecvQueue.front();
msgRecvQueue.pop_front();
// 因为unique_lock的灵活性,我们可以随时unlock,以免锁住太长时间
sbguard2.unlock();
cout << "outMsgRecvQueue()执行,取出第一个元素" << endl;
}
}

private:
std::list<int> msgRecvQueue;
std::mutex mymutex1;
std::condition_variable condition;
};

int main() {
A myobja;
std::thread myoutobj(&A::outMsgRecvQueue, &myobja);
std::thread myinobj(&A::inMsgRecvQueue, &myobja);
myinobj.join();
myoutobj.join();
}

深入思考

上面的代码可能导致出现一种情况:
因为 outMsgRecvQueue()inMsgRecvQueue() 并不是一对一执行的,所以当程序循环执行很多次以后,可能在 msgRecvQueue 中已经有了很多消息,但是 outMsgRecvQueue 还是被唤醒一次只处理一条数据。

这时可以考虑把 outMsgRecvQueue 多执行几次,或者对 inMsgRecvQueue 进行限流。

notify_one() :通知一个线程的 wait()

notify_all() :通知所有线程的 wait()

八、async、future、packaged_task、promise

  • std::asyncstd::future 创建后台任务并返回值
  • std::packaged_task
  • std::promise

8.1 std::asyncstd::future 创建后台任务并返回值

需要包含头文件 #include <future>

std::async 是一个函数模板,用来启动一个**异步任务,启动起来一个异步任务之后,它返回一个 std::future 对象,这个对象是个类模板**。

什么叫“启动一个异步任务”?就是自动创建一个线程,并开始执行对应的线程入口函数,返回一个 std::future 对象,这个 std::future 对象中就含有线程入口函数所返回的结果,我们可以通过调用 std::future 对象的成员函数 **get()**来获取结果。

“future”将来的意思,也有人称呼 std::future 提供了一种访问异步操作结果的机制,就是说这个结果你可能没办法马上拿到,但在不久的将来,这个线程执行完毕时,你就能够拿到结果,所以可以理解为:future中保存着一个值,这个值是在将来的某个时刻能够拿到。

**std::future**对象的 **get()成员函数会等待线程执行结束**并返回结果,拿不到结果它就会一直等待,感觉有点像 join() 但是,它是可以获取结果的。

**std::future**对象的 wait() 成员函数,用于等待线程返回,本身并不返回结果,这个效果和 std::threadjoin() 更像。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <iostream>
#include <future>
using namespace std;
class A {
public:
int mythread(int mypar) {
cout << mypar << endl;
return mypar;
}
};

int mythread() {
cout << "mythread() start threadid = "
<< std::this_thread::get_id() << endl;
// 定义睡眠时间 5S
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura); // 睡眠
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
// 普通线程函数
std::future<int> result1 = std::async(mythread);
cout << "continue........" << endl;

// 卡在这里等待mythread()执行完毕,拿到结果
cout << result1.get() << endl;

// 类成员函数
// 第二个参数是对象引用才能保证线程里执行的是同一个对象
A a;
int tmp = 12;
std::future<int> result2 = std::async(&A::mythread, &a, tmp);
cout << result2.get() << endl;
// 或者 result2.wait(); -> 没有返回值
cout << "good luck" << endl;
return 0;
}

通过向 std::async() 传递一个参数,改参数是 std::launch 类型(枚举类型),来达到一些特殊的目的:

  • std::lunch::deferred (defer推迟,延期)

表示线程入口函数的调用会被延迟,一直到 std::futurewait() 或者 get() 函数被调用时(由主线程调用)才会执行;如果 wait() 或者 get() 没有被调用,则不会执行。

实际上根本就没有创建新线程std::lunch::deferred 意思时延迟调用,并没有创建新线程,是在主线程中调用的线程入口函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <future>
using namespace std;

int mythread() {
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
std::future<int> result1 = std::async(std::launch::deferred ,mythread);
cout << "continue........" << endl;
//卡在这里等待mythread()执行完毕,拿到结果
cout << result1.get() << endl;
cout << "good luck" << endl;
return 0;
}
  • std::launch::async ,在调用 async 函数的时候就开始创建新线程
1
2
3
4
5
6
7
8
int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
std::future<int> result1 = std::async(std::launch::async ,mythread);
cout << "continue........" << endl;
cout << result1.get() << endl;
cout << "good luck" << endl;
return 0;
}

8.2 std::async 续谈

std::async 参数详述,async 用来创建一个异步任务

  • std::launch::deferred延迟调用,(主线程调用)

  • std::launch::async强制创建一个线程

std::async() 一般不叫创建线程(他能够创建线程),一般叫它创建一个异步任务。

std::asyncstd::thread 最明显的**不同,就是 async 有时候并不创建新线程。**

① 如果用std::launch::deferred来调用 async?

延迟到调用 get() 或者 wait() 时执行,如果不调用就不会执行

② 如果用std::launch::async来调用 async?

强制这个异步任务在新线程上执行,这意味着,系统必须要创建出新线程来运行入口函数。

③ 如果同时用 std::launch::async | std::launch::deferred

这里这个或者关系意味着 async 的行为可能是 std::launch::async 创建新线程立即执行, 也可能是 std::launch::deferred 没有创建新线程并且延迟到调用 **get()**执行,由系统根据实际情况来决定采取哪种方案。

④ 不带额外参数 std::async(mythread) ,只给 async 一个入口函数名

此时的系统给的默认值是 std::launch::async | std::launch::deferred 和 ③ 一样,有系统自行决定异步还是同步运行。

std::asyncstd::thread()区别:

std::thread() 如果系统资源紧张可能出现创建线程失败的情况,如果创建线程失败那么程序就可能崩溃,而且不容易拿到函数返回值(不是拿不到)
std::async() 创建异步任务。可能创建线程也可能不创建线程,并且容易拿到线程入口函数的返回值;

由于系统资源限制:
① 如果用 std::thread 创建的线程太多,则可能创建失败,系统报告异常,崩溃。

② 如果用 std::async ,一般就不会报异常,因为如果系统资源紧张,无法创建新线程的时候,async 不加额外参数的调用方式就不会创建新线程。而是在后续调用 get() 请求结果时执行在这个调用 get() 的线程上。

如果你强制 async 一定要创建新线程就要使用 std::launch::async 标记。承受的代价是,系统资源紧张时可能崩溃。

③ 根据经验,一个程序中线程数量 不宜超过100~200 。

async 不确定性问题的解决
不加额外参数的 async 调用时让系统自行决定,是否创建新线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::future result = std::async(mythread);
// 问题焦点在于这个写法,任务到底有没有被推迟执行。
// 可以通过 wait_for 返回状态来判断:

// 如果资源不紧张的情况下:
// - 如果线程运行需要5秒,主线程在这要等待 6 秒的话,此时 status = ready
// - 如果线程运行需要5秒,主线程在这里要等待 1 秒的话,此时 status = timeout

// 如果资源紧张的情况下:
// 如果出现了 status = deferred,则说明系统选择了 std::launch::deferred,延迟调用
// 一旦使用了deferred,就不会创建线程,都是在get()才会调用线程入口函数,不管wait_for多长时间

std::future_status status = result.wait_for(std::chrono::seconds(6));
//std::future_status status = result.wait_for(6s);
if (status == std::future_status::timeout) {
// 超时:表示线程还没有执行完
cout << "超时了,线程还没有执行完" << endl;
}
else if (status == std::future_status::ready) {
// 表示线程成功返回
cout << "线程执行成功,返回" << endl;
cout << result.get() << endl;
}
else if (status == std::future_status::deferred) {
cout << "线程延迟执行" << endl;
cout << result.get() << endl;
}

8.3 std::packaged_task:打包任务,把任务包装起来

类模板,它的模板参数是各种可调用对象,通过 packaged_task各种可调用对象包装起来,方便将来作为线程入口函数来调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <thread>
#include <iostream>
#include <future>
using namespace std;

int mythread(int mypar) {
cout << mypar << endl;
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
// 我们把函数mythread通过packaged_task包装起来
// 返回值类型是int,参数是一个int: int mythread(int)
std::packaged_task<int(int)> mypt1(mythread);

// 也可以封装 lambda 函数
std::packaged_task<int(int)> mypt2([](int mypar) {
cout << mypar << endl;
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
});

// 创建线程 t1,使用packaged_task封装的线程入口函数
std::thread t1(std::ref(mypt1), 1);
t1.join();
std::future<int> result = mypt1.get_future();
// std::future对象里包含有线程入口函数的返回结果,通过 get_future()函数
// 这里 result 保存 mythread 返回的结果。
cout << result.get() << endl;

// packaged_task 包装起来的可调用对象还可以直接调用,从这个角度来讲,、、
// packaged_task 对象也是一个可调用对象,lambda可以直接调用
mypt2(1);
std::future<int> result = mypt2.get_future();
cout << result.get() << endl;
// 上段程序是在主线程中进行的
return 0;
}

8.4 std::promise

模板类,能够在某个线程中给它赋值,并可以在其他线程中,把这个值取出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <thread>
#include <iostream>
#include <future>
using namespace std;

void mythread(std::promise<int> &tmp, int clac) {
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
int result = clac;
tmp.set_value(result); // 结果保存到了tmp这个对象中,使用 set_val() 函数
return;
}

vector<std::packaged_task<int(int)>> task_vec;

int main() {
std::promise<int> myprom;
std::thread t1(mythread, std::ref(myprom), 180);
t1.join(); //在这里线程已经执行完了
// promise 和 future绑定,用于获取线程返回值
std::future<int> fu1 = myprom.get_future();
auto result = fu1.get();
cout << "result = " << result << endl;
}

通过 promise 保存一个值,在将来某个时刻我们通过把一个**future**绑定到这个 promise 上,来得到绑定的值。注意,使用 thread 时,必须 **join()**或者 detach(), 否则程序会报异常。

九、future 其他成员函数、shared_future、atomic

  • std::future 其他成员函数
  • std::shared_future
  • std::atomic

9.1 std::future 的成员函数

1
std::future_status status = result.wait_for(std::chrono::seconds(5));

卡住当前流程,等待 std::async() 的异步任务运行一段时间,然后返回其状态**std::future_status** 。如果**std::async()的参数是 std::launch::deferred (延迟执行),则不会卡住主流程**,没有创建新线程,线程入口函数仍然在遇到 get() 的时候调用,只是主线程中的普通函数调用。
std::future_status 是枚举类型,表示异步任务的执行状态。类型的取值有

  • std::future_status::timeout
  • std::future_status::ready
  • std::future_status::deferred
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <future>
using namespace std;

int mythread() {
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
std::future<int> result = std::async(mythread);
cout << "continue........" << endl;
//卡在这里等待mythread()执行完毕,拿到结果
//cout << result1.get() << endl;
// 等待1秒
std::future_status status=result.wait_for(std::chrono::seconds(1));
if (status == std::future_status::timeout) {
// 超时:表示线程还没有执行完
cout << "超时了,线程还没有执行完" << endl;
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
#include <future>
using namespace std;

int mythread() {
cout << "mythread() start" << "threadid = " << std::this_thread::get_id() << endl;
//std::chrono::milliseconds dura(5000);
//std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
std::future<int> result = std::async(std::launch::deferred, mythread);
//std::future<int> result = std::async(mythread);
cout << "continue........" << endl;
//卡在这里等待mythread()执行完毕,拿到结果
//cout << result1.get() << endl;
std::future_status status=result.wait_for(std::chrono::seconds(6));
if (status == std::future_status::timeout) {
// 超时:表示线程还没有执行完
cout << "超时了,线程还没有执行完" << endl;
}
else if (status == std::future_status::ready) {
// 表示线程成功结束并返回
cout << "线程执行成功,返回" << endl;
cout << result.get() << endl;
}
else if (status == std::future_status::deferred) {
// 如果设置 std::future<int> result =
// std::async(std::launch::deferred, mythread);,则本条件成立
cout << "线程延迟执行" << endl;
cout << result.get() << endl;
}

cout << "good luck" << endl;
return 0;
}

9.2 std::shared_future

std::shared_future:也是个类模板

  • **std::future**的 get() 成员函数是转移数据

  • std::shared_futureget() 成员函数是复制数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <thread>
#include <iostream>
#include <future>
using namespace std;

int mythread() {
cout << "mythread() start" << "threadid = "
<< std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "mythread() end" << "threadid = "
<< std::this_thread::get_id() << endl;
return 5;
}

int main() {
cout << "mainThreadid = " << std::this_thread::get_id() << endl;
std::packaged_task<int()> mypt(mythread);
std::thread t1(std::ref(mypt));
std::future<int> result = mypt.get_future();

bool ifcanget = result.valid(); //判断future 中的值是不是一个有效值
// 执行完毕后result_s里有值,而result里空了
std::shared_future<int> result_s(result.share());
// 使用移动构造函数来创建 shared_future对象
std::shared_future<int> result_s(std::move(result));
// 通过get_future返回值直接构造一个shared_future对象
std::shared_future<int> result_s(mypt.get_future());

t1.join();

auto myresult1 = result_s.get();
auto myresult2 = result_s.get();

cout << "good luck" << endl;
return 0;
}

9.3 std::atomic 原子操作

有两个线程,对一个变量进行操作,一个线程读这个变量的值,一个线程往这个变量中写值。即使是一个简单变量的读取和写入操作,如果不加锁,也有可能会导致读写值混乱(一条C语句会被拆成3、4条汇编语句来执行,所以仍然有可能混乱);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>
using namespace std;
int g_count = 0;

void mythread1() {
for (int i = 0; i < 1000000; i++) {
// 该语句会被拆成多条汇编语句,所以可能在线程切换的过程中出现混乱
g_count++;
}
}
int main() {
std::thread t1(mythread1);
std::thread t2(mythread1);
t1.join();
t2.join();
cout << "正常情况下结果应该是200 0000次,实际是" << g_count << endl;
}
// !!! 注意:输出结果并不一定是 2000000 !!!

互斥量:多线程编程中用于保护共享数据:先锁住, 操作共享数据, 解锁。所以通过 mutex 可以解决,但是效率不高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;

int g_count = 0;
std::mutex mymutex;

void mythread1() {
for (int i = 0; i < 1000000; i++) {
std::unique_lock<std::mutex> u1(mymutex);
// std::lock_guard<std::mutex> u1(mymutex);
g_count++;
}
}
int main() {
std::thread t1(mythread1);
std::thread t2(mythread1);
t1.join();
t2.join();
cout << "正常情况下结果应该是200 0000次,实际是" << g_count << endl;
}

9.4 std::atomic 用法

原子操作:在多线程中不会被打断的程序执行片段。原子操作可以理解成一种:不需要用到互斥量加锁(无锁)技术的多线程并发编程方式。

  • 优势:从效率上来说,原子操作要比互斥量的方式效率要高

  • 区别:互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量

原子操作,一般都是指“不可分割的操作”;也就是说这种操作状态要么是完成的,要么是没完成的,不可能出现半完成状态。

std::atomic 来代表原子操作,是个类模板,用来封装某个类型的值的。

需要添加头文件 #include <atomic>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <atomic>

using namespace std;
std::atomic<int> g_count = 0; // 封装了一个类型为int的 对象(值)

void mythread1() {
for (int i = 0; i < 1000000; i++) {
g_count++;
// 另外需要注意的是:一般atomic原子操作
// 针对 ++,–,+=,-=,&=,|=,^= 是支持的,其他操作不一定支持。
g_count += 1; // 支持
g_count = g_count + 1; // 不支持
}
}

int main() {
std::thread t1(mythread1);
std::thread t2(mythread1);
t1.join();
t2.join();
cout << "正常情况下结果应该是200 0000次,实际是" << g_count << endl;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <atomic>

using namespace std;

std::atomic<bool> g_ifEnd = false; // 封装了一个类型为bool的 对象(值)

void mythread() {
std::chrono::milliseconds dura(1000);
// 每 1 秒中判断一次 g_ifEnd
while (g_ifEnd == false) {
cout << "th_id = " << std::this_thread::get_id() << endl;
std::this_thread::sleep_for(dura);
}
cout << "th id = " << std::this_thread::get_id() << "over" << endl;
}

int main() {
std::thread t1(mythread);
std::thread t2(mythread);
// 主线程 睡眠 5s
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
g_ifEnd = true;
cout << "程序执行完毕" << endl;
t1.join();
t2.join();
}

参考资料:

https://www.bilibili.com/video/BV1Yb411L7ak

https://blog.csdn.net/qq_38231713/category_10001159.html



----------- 本文结束 -----------




0%