一文搞懂C++多线程 🔥
- 作者
- Name
- 青玉白露
- Github
- @white0dew
- Modified on
- Reading time
- 11 分钟
阅读:.. 评论:..
这周学习了多线程并发的相关知识,写一个读书笔记以作记录。包括进程、线程、并发、mutex等等。
一、进程、线程、并发
- 进程:简而言之就是一个运行的程序,如点开一个exe文件就打开了一个进程。
- 线程:进程中的不同执行路径,即在一个进程中运行了多个功能。每一个进程都至少包含有一个线程,即主线程,主线程与进程的关系是相互依存的。
- 并发:分为多线程和多进程,多进程同时运行和多线程同时运行的情况。例如,同时打开两个QQ客户端是多进程,VStudio中多个窗口线程是多线程并发。
thread
库
二、以前,由于系统环境的不同,如Windows和Linux,需要选择不同的线程库进行代码编写,可移植性不高。C11之后有了标准的线程库:std::thread
。除此之外,C还提供了另外三个库来支持多线程编程,分别是 <mutex>
, <condition_variable>
和 <future>
。
thread
类
thread
类是thread
库多线程实现的基础。以下是其构造函数及示例代码:
构造函数
thread()
:默认构造函数,创建一个空的std::thread
执行对象(在线程池的实现中需要提前创建一定数量的线程对象)。thread(Fn&& fn, Args&&...)
:初始化构造函数,创建一个std::thread
对象,该对象可被 joinable,新产生的线程会调用fn
函数(即可调用对象),该函数的参数由args
给出。thread(const thread&) = delete
:拷贝构造函数(被禁用),意味着std::thread
对象不可拷贝构造,线程在同一时刻仅能由一个线程对象运行。thread(thread&& x)
:转移/移动构造函数,调用成功之后x
不再代表任何std::thread
执行对象,相当于将“线程”的所有权转给了另外的线程对象。
例子
以下示例展示了如何创建线程:
#include <iostream> // std::cout #include <thread> // std::thread void func1() { for (int i = 0; i != 10; ++i) { std::cout << "thread 1 print " << i << std::endl; } } void func2(int n) { std::cout << "thread 2 print " << n << std::endl; } int main() { std::thread t1(func1); std::thread t2(func2, 123); std::cout << "main, foo and bar now execute concurrently...\n"; t1.join(); // 阻塞直到第一个线程完成 t2.join(); // 阻塞直到第二个线程完成 std::cout << "thread 1 and thread 2 completed.\n"; system("pause"); return 0; }
结果:
thread 1 print 0 thread 1 print 1 ... thread 2 print 123 ... main, foo and bar now execute concurrently... thread 1 and thread 2 completed.
重要函数
join()
:阻塞主线程直到子线程执行完毕。detach()
:将线程与主线程分离,线程被运行时库接管。若在Linux环境下运行,即使Ctrl+C退出主线程,子线程依旧运行。get_id()
:获取线程 ID,返回一个类型为std::thread::id
的对象。joinable()
:检查线程是否可被 join。swap()
:交换两个线程对象所代表的底层句柄资源。
例子
以下示例展示了 swap
函数的使用:
#include <iostream> #include <thread> #include <chrono> void foo() { std::this_thread::sleep_for(std::chrono::seconds(1)); } void bar() { std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::thread t1(foo); std::thread t2(bar); std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; std::swap(t1, t2); std::cout << "after std::swap(t1, t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.swap(t2); std::cout << "after t1.swap(t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.join(); t2.join(); }
结果:
thread 1 id: ... thread 2 id: ... after std::swap(t1, t2): thread 1 id: ... thread 2 id: ... after t1.swap(t2): thread 1 id: ... thread 2 id: ...
其他函数
yield()
:当前线程放弃执行,操作系统调度另一线程继续执行。sleep_until
:线程休眠至某个指定的时刻,被重新唤醒。sleep_for
:线程休眠某个指定的时间片,详见例子。
#include <iostream> #include <chrono> #include <thread> int main() { std::cout << "Hello waiter" << std::endl; std::chrono::milliseconds dura(2000); std::this_thread::sleep_for(dura); std::cout << "Waited 2000 ms\n"; }
mutex
库
三、互斥
互斥是指散布在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务不能运行其余程序片段,需等待该任务运行完毕。
mutex
类
std::mutex
是最基本的互斥量类,不支持递归地对 std::mutex
对象上锁,而 std::recursive_mutex
则可以。
常用函数
lock()
:锁住互斥量,可能发生死锁。unlock()
:解锁,释放对互斥量的所有权。try_lock()
:尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程不会被阻塞。可能发生死锁。std::recursive_mutex
:允许同一个线程多次上锁解锁。
例子
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::lock_guard #include <stdexcept> // std::logic_error std::mutex mtx; void print_even(int x) { if (x % 2 == 0) std::cout << x << " is even\n"; else throw (std::logic_error("not even")); } void print_thread_id(int id) { try { std::lock_guard<std::mutex> lck(mtx); print_even(id); } catch (std::logic_error&) { std::cout << "[exception caught]\n"; } } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_thread_id, i + 1); for (auto& th : threads) th.join(); return 0; }
输出:
[exception caught] 2 is even ... 10 is even
unique_lock
unique_lock
对象以独占所有权的方式管理 mutex
对象的上锁和解锁操作。
condition_variable
四、条件变量 条件变量 (Condition Variable) 在多线程程序中用于实现“等待->唤醒”逻辑。
例子
#include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; // 全局互斥锁 std::condition_variable cv; // 全局条件变量 bool ready = false; // 全局标志位 void do_print_id(int id) { std::unique_lockstd::mutex lck(mtx); while (!ready) // 如果标志位不为 true,则等待 cv.wait(lck); // 当前线程被阻塞,直到全局标志位变为 true 后被唤醒 std::cout << "thread " << id << '\n'; // 打印线程编号 } void go() { std::unique_lockstd::mutex lck(mtx); ready = true; // 设置全局标志位为 true cv.notify_all(); // 唤醒所有线程 } int main() { std::thread threads[10]; // 创建10个线程 for (int i = 0; i < 10; ++i) threads[i] = std::thread(do_print_id, i); std::cout << "10 threads ready to race...\n"; go(); // 唤醒所有等待线程 for (auto& th : threads) th.join(); // 主线程等待所有子线程完成 return 0; }
. 输出:
10 threads ready to race... thread 0 thread 1 thread 2 thread 3 thread 4 thread 5 thread 6 thread 7 thread 8 thread 9
解释
在上述代码中,当调用 cv.wait()
时,如果 ready
变量不为 true
,当前线程将被阻塞。当 go()
函数调用 cv.notify_all()
时,所有被阻塞的线程将被唤醒并继续执行。
std::condition_variable
提供的其他函数
wait_for
:等待一段指定的时间。wait_until
:等待直到某个指定的时刻。
输出:
thread 0 thread 1 thread 2 ... thread 9
future
五、异步调用 std::async
std::async
提供了一种启动异步任务的机制,可以通过 std::future
对象获取异步操作的结果。
例子
#include <iostream> #include <future> #include <chrono> bool is_prime(int x) { for (int i = 2; i < x; ++i) { if (x % i == 0) return false; } return true; } int main() { // 异步启动任务 std::future<bool> fut = std::async(is_prime, 700020007); std::cout << "please wait"; std::chrono::milliseconds span(100); while (fut.wait_for(span) != std::future_status::ready) std::cout << "."; bool ret = fut.get(); std::cout << "\nfinal result: " << std::boolalpha << ret << std::endl; return 0; }
输出:
please wait........ final result: true
std::promise
std::promise
是一个用于在一个线程中设置值,并允许另一个线程通过 std::future
获取该值的类。
例子
#include <iostream> #include <thread> #include <future> void set_value(std::promise<int>& prom) { prom.set_value(10); } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(set_value, std::ref(prom)); std::cout << "value: " << fut.get() << std::endl; t.join(); return 0; }
输出:
value: 10
atomic
六、原子操作 std::atomic
是 C++11 提供的用于实现原子操作的类型,避免数据竞争。
std::atomic_flag
std::atomic_flag
是一个简单的原子布尔类型,可以用于实现互斥锁。
#include <iostream> #include <atomic> #include <thread> #include <vector> #include <sstream> std::atomic_flag lock = ATOMIC_FLAG_INIT; std::stringstream stream; void append_number(int x) { while (lock.test_and_set()) {} // 自旋锁 stream << "thread#" << x << "\n"; lock.clear(); } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append_number, i)); for (auto& th : threads) th.join(); std::cout << stream.str(); return 0; }
输出:
thread#0 thread#1 ... thread#9
std::atomic
例子:#include <iostream> #include <atomic> #include <thread> #include <vector> std::atomic<bool> ready(false); std::atomic_flag winner = ATOMIC_FLAG_INIT; void count1m(int id) { while (!ready) {} for (int i = 0; i < 1000000; ++i) {} if (!winner.test_and_set()) { std::cout << "winner: " << id << std::endl; } } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(count1m, i)); ready = true; for (auto& th : threads) th.join(); return 0; }
输出:
winner: 0
七、锁的详解
std::lock_guard
std::lock_guard
是 C++11 中引入的一个 RAII 风格的锁管理器,用于自动管理锁的生命周期。
特点
- 简单易用:
std::lock_guard
在构造时获取锁,在析构时释放锁。 - 异常安全:即使在异常情况下,
std::lock_guard
也能保证锁的释放。
例子
#include <iostream> #include <thread> #include <mutex> std::mutex mtx; // 全局互斥锁 void print_even(int x) { if (x % 2 == 0) std::cout << x << " is even\n"; else throw std::logic_error("not even"); } void print_thread_id(int id) { try { std::lock_guard<std::mutex> lck(mtx); // 自动获取并释放锁 print_even(id); } catch (std::logic_error&) { std::cout << "[exception caught]\n"; } } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_thread_id, i + 1); for (auto& th : threads) th.join(); return 0; }
输出:
[exception caught] 2 is even [exception caught] 4 is even ... 10 is even
std::unique_lock
std::unique_lock
提供了比 std::lock_guard
更加灵活的锁管理。
特点
- 灵活性:
std::unique_lock
可以延迟锁定、提前解锁及再次锁定。 - 条件变量:在使用条件变量时,必须使用
std::unique_lock
。
例子
#include <iostream> #include <thread> #include <mutex> #include <condition_variable> std::mutex mtx; std::condition_variable cv; bool ready = false; void print_id(int id) { std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, [] { return ready; }); // 等待 ready 为 true std::cout << "thread " << id << '\n'; } void go() { std::unique_lock<std::mutex> lck(mtx); ready = true; cv.notify_all(); // 唤醒所有等待的线程 } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_id, i); std::this_thread::sleep_for(std::chrono::seconds(1)); go(); for (auto& th : threads) th.join(); return 0; }
输出:
thread 0 thread 1 thread 2 ... thread 9
std::shared_lock
std::shared_lock
是 C++14 引入的用于共享所有权的锁管理器,适用于读多写少的场景。
特点
- 共享模式:允许多个线程共享读取权限,但写操作必须独占。
- 提升并发性:适用于读多写少的场景,提升并发性能。
例子
#include <iostream> #include <shared_mutex> #include <thread> #include <vector> std::shared_mutex shared_mtx; void reader(int id) { std::shared_lock<std::shared_mutex> lock(shared_mtx); std::cout << "Reader " << id << " is reading.\n"; std::this_thread::sleep_for(std::chrono::milliseconds(50)); } void writer(int id) { std::unique_lock<std::shared_mutex> lock(shared_mtx); std::cout << "Writer " << id << " is writing.\n"; std::this_thread::sleep_for(std::chrono::milliseconds(50)); } int main() { std::vector<std::thread> threads; for (int i = 0; i < 5; ++i) { threads.emplace_back(reader, i); } for (int i = 0; i < 2; ++i) { threads.emplace_back(writer, i); } for (auto& th : threads) { th.join(); } return 0; }
输出:
Reader 0 is reading. Reader 1 is reading. Reader 2 is reading. Writer 0 is writing. Reader 3 is reading. Reader 4 is reading. Writer 1 is writing.
八、线程池
介绍
线程池是一种常用的多线程设计模式,它通过复用一组线程来执行任务,从而避免频繁创建和销毁线程的开销。线程池在需要频繁创建和销毁线程的场景中尤其有用,例如服务器处理请求、并行计算等。
实现
以下是一个简单的线程池实现:
#include <iostream> #include <vector> #include <thread> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <future> class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); }
使用线程池
int main() { ThreadPool pool(4); auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 1, 2); auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 3, 4); std::cout << "1 + 2 = " << result1.get() << std::endl; std::cout << "3 * 4 = " << result2.get() << std::endl; return 0; }
输出:
1 + 2 = 3 3 * 4 = 12
优点
- 复用线程:避免频繁创建销毁线程的开销。
- 任务并行:提高任务处理的并行度。
- 管理方便:统一管理线程和任务。