logo

一文搞懂C++多线程 🔥

作者
Modified on
Reading time
11 分钟阅读:..评论:..

这周学习了多线程并发的相关知识,写一个读书笔记以作记录。包括进程、线程、并发、mutex等等。

一、进程、线程、并发

  • 进程:简而言之就是一个运行的程序,如点开一个exe文件就打开了一个进程。
  • 线程:进程中的不同执行路径,即在一个进程中运行了多个功能。每一个进程都至少包含有一个线程,即主线程,主线程与进程的关系是相互依存的。
  • 并发:分为多线程和多进程,多进程同时运行和多线程同时运行的情况。例如,同时打开两个QQ客户端是多进程,VStudio中多个窗口线程是多线程并发。

二、thread

以前,由于系统环境的不同,如Windows和Linux,需要选择不同的线程库进行代码编写,可移植性不高。C11之后有了标准的线程库:std::thread。除此之外,C还提供了另外三个库来支持多线程编程,分别是 <mutex>, <condition_variable><future>

thread

thread类是thread库多线程实现的基础。以下是其构造函数及示例代码:

构造函数

  • thread():默认构造函数,创建一个空的 std::thread 执行对象(在线程池的实现中需要提前创建一定数量的线程对象)。
  • thread(Fn&& fn, Args&&...):初始化构造函数,创建一个 std::thread 对象,该对象可被 joinable,新产生的线程会调用 fn 函数(即可调用对象),该函数的参数由 args 给出。
  • thread(const thread&) = delete:拷贝构造函数(被禁用),意味着 std::thread 对象不可拷贝构造,线程在同一时刻仅能由一个线程对象运行。
  • thread(thread&& x):转移/移动构造函数,调用成功之后 x 不再代表任何 std::thread 执行对象,相当于将“线程”的所有权转给了另外的线程对象。

例子

以下示例展示了如何创建线程:

#include <iostream> // std::cout #include <thread> // std::thread void func1() { for (int i = 0; i != 10; ++i) { std::cout << "thread 1 print " << i << std::endl; } } void func2(int n) { std::cout << "thread 2 print " << n << std::endl; } int main() { std::thread t1(func1); std::thread t2(func2, 123); std::cout << "main, foo and bar now execute concurrently...\n"; t1.join(); // 阻塞直到第一个线程完成 t2.join(); // 阻塞直到第二个线程完成 std::cout << "thread 1 and thread 2 completed.\n"; system("pause"); return 0; }

结果

thread 1 print 0 thread 1 print 1 ... thread 2 print 123 ... main, foo and bar now execute concurrently... thread 1 and thread 2 completed.

重要函数

  • join():阻塞主线程直到子线程执行完毕。
  • detach():将线程与主线程分离,线程被运行时库接管。若在Linux环境下运行,即使Ctrl+C退出主线程,子线程依旧运行。
  • get_id():获取线程 ID,返回一个类型为 std::thread::id 的对象。
  • joinable():检查线程是否可被 join。
  • swap():交换两个线程对象所代表的底层句柄资源。

例子

以下示例展示了 swap 函数的使用:

#include <iostream> #include <thread> #include <chrono> void foo() { std::this_thread::sleep_for(std::chrono::seconds(1)); } void bar() { std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::thread t1(foo); std::thread t2(bar); std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; std::swap(t1, t2); std::cout << "after std::swap(t1, t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.swap(t2); std::cout << "after t1.swap(t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.join(); t2.join(); }

结果

thread 1 id: ... thread 2 id: ... after std::swap(t1, t2): thread 1 id: ... thread 2 id: ... after t1.swap(t2): thread 1 id: ... thread 2 id: ...

其他函数

  • yield():当前线程放弃执行,操作系统调度另一线程继续执行。
  • sleep_until:线程休眠至某个指定的时刻,被重新唤醒。
  • sleep_for:线程休眠某个指定的时间片,详见例子。
#include <iostream> #include <chrono> #include <thread> int main() { std::cout << "Hello waiter" << std::endl; std::chrono::milliseconds dura(2000); std::this_thread::sleep_for(dura); std::cout << "Waited 2000 ms\n"; }

三、mutex

互斥

互斥是指散布在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务不能运行其余程序片段,需等待该任务运行完毕。

mutex

std::mutex 是最基本的互斥量类,不支持递归地对 std::mutex 对象上锁,而 std::recursive_mutex 则可以。

常用函数

  • lock():锁住互斥量,可能发生死锁。
  • unlock():解锁,释放对互斥量的所有权。
  • try_lock():尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程不会被阻塞。可能发生死锁。
  • std::recursive_mutex:允许同一个线程多次上锁解锁。

例子

#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::lock_guard #include <stdexcept> // std::logic_error std::mutex mtx; void print_even(int x) { if (x % 2 == 0) std::cout << x << " is even\n"; else throw (std::logic_error("not even")); } void print_thread_id(int id) { try { std::lock_guard<std::mutex> lck(mtx); print_even(id); } catch (std::logic_error&) { std::cout << "[exception caught]\n"; } } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_thread_id, i + 1); for (auto& th : threads) th.join(); return 0; }

输出

[exception caught] 2 is even ... 10 is even

unique_lock

unique_lock 对象以独占所有权的方式管理 mutex 对象的上锁和解锁操作。

四、条件变量 condition_variable

条件变量 (Condition Variable) 在多线程程序中用于实现“等待->唤醒”逻辑。

例子

#include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; // 全局互斥锁 std::condition_variable cv; // 全局条件变量 bool ready = false; // 全局标志位 void do_print_id(int id) { std::unique_lockstd::mutex lck(mtx); while (!ready) // 如果标志位不为 true,则等待 cv.wait(lck); // 当前线程被阻塞,直到全局标志位变为 true 后被唤醒 std::cout << "thread " << id << '\n'; // 打印线程编号 } void go() { std::unique_lockstd::mutex lck(mtx); ready = true; // 设置全局标志位为 true cv.notify_all(); // 唤醒所有线程 } int main() { std::thread threads[10]; // 创建10个线程 for (int i = 0; i < 10; ++i) threads[i] = std::thread(do_print_id, i); std::cout << "10 threads ready to race...\n"; go(); // 唤醒所有等待线程 for (auto& th : threads) th.join(); // 主线程等待所有子线程完成 return 0; }

. 输出:

10 threads ready to race... thread 0 thread 1 thread 2 thread 3 thread 4 thread 5 thread 6 thread 7 thread 8 thread 9

解释

在上述代码中,当调用 cv.wait() 时,如果 ready 变量不为 true,当前线程将被阻塞。当 go() 函数调用 cv.notify_all() 时,所有被阻塞的线程将被唤醒并继续执行。

std::condition_variable 提供的其他函数

  • wait_for:等待一段指定的时间。
  • wait_until:等待直到某个指定的时刻。

输出

thread 0 thread 1 thread 2 ... thread 9

五、异步调用 future

std::async

std::async 提供了一种启动异步任务的机制,可以通过 std::future 对象获取异步操作的结果。

例子

#include <iostream> #include <future> #include <chrono> bool is_prime(int x) { for (int i = 2; i < x; ++i) { if (x % i == 0) return false; } return true; } int main() { // 异步启动任务 std::future<bool> fut = std::async(is_prime, 700020007); std::cout << "please wait"; std::chrono::milliseconds span(100); while (fut.wait_for(span) != std::future_status::ready) std::cout << "."; bool ret = fut.get(); std::cout << "\nfinal result: " << std::boolalpha << ret << std::endl; return 0; }

输出

please wait........ final result: true

std::promise

std::promise 是一个用于在一个线程中设置值,并允许另一个线程通过 std::future 获取该值的类。

例子

#include <iostream> #include <thread> #include <future> void set_value(std::promise<int>& prom) { prom.set_value(10); } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(set_value, std::ref(prom)); std::cout << "value: " << fut.get() << std::endl; t.join(); return 0; }

输出

value: 10

六、原子操作 atomic

std::atomic 是 C++11 提供的用于实现原子操作的类型,避免数据竞争。

std::atomic_flag

std::atomic_flag 是一个简单的原子布尔类型,可以用于实现互斥锁。

#include <iostream> #include <atomic> #include <thread> #include <vector> #include <sstream> std::atomic_flag lock = ATOMIC_FLAG_INIT; std::stringstream stream; void append_number(int x) { while (lock.test_and_set()) {} // 自旋锁 stream << "thread#" << x << "\n"; lock.clear(); } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append_number, i)); for (auto& th : threads) th.join(); std::cout << stream.str(); return 0; }

输出

thread#0 thread#1 ... thread#9

例子:std::atomic

#include <iostream> #include <atomic> #include <thread> #include <vector> std::atomic<bool> ready(false); std::atomic_flag winner = ATOMIC_FLAG_INIT; void count1m(int id) { while (!ready) {} for (int i = 0; i < 1000000; ++i) {} if (!winner.test_and_set()) { std::cout << "winner: " << id << std::endl; } } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(count1m, i)); ready = true; for (auto& th : threads) th.join(); return 0; }

输出

winner: 0

七、锁的详解

std::lock_guard

std::lock_guard 是 C++11 中引入的一个 RAII 风格的锁管理器,用于自动管理锁的生命周期。

特点

  • 简单易用std::lock_guard 在构造时获取锁,在析构时释放锁。
  • 异常安全:即使在异常情况下,std::lock_guard 也能保证锁的释放。

例子

#include <iostream> #include <thread> #include <mutex> std::mutex mtx; // 全局互斥锁 void print_even(int x) { if (x % 2 == 0) std::cout << x << " is even\n"; else throw std::logic_error("not even"); } void print_thread_id(int id) { try { std::lock_guard<std::mutex> lck(mtx); // 自动获取并释放锁 print_even(id); } catch (std::logic_error&) { std::cout << "[exception caught]\n"; } } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_thread_id, i + 1); for (auto& th : threads) th.join(); return 0; }

输出

[exception caught] 2 is even [exception caught] 4 is even ... 10 is even

std::unique_lock

std::unique_lock 提供了比 std::lock_guard 更加灵活的锁管理。

特点

  • 灵活性std::unique_lock 可以延迟锁定、提前解锁及再次锁定。
  • 条件变量:在使用条件变量时,必须使用 std::unique_lock

例子

#include <iostream> #include <thread> #include <mutex> #include <condition_variable> std::mutex mtx; std::condition_variable cv; bool ready = false; void print_id(int id) { std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, [] { return ready; }); // 等待 ready 为 true std::cout << "thread " << id << '\n'; } void go() { std::unique_lock<std::mutex> lck(mtx); ready = true; cv.notify_all(); // 唤醒所有等待的线程 } int main() { std::thread threads[10]; for (int i = 0; i < 10; ++i) threads[i] = std::thread(print_id, i); std::this_thread::sleep_for(std::chrono::seconds(1)); go(); for (auto& th : threads) th.join(); return 0; }

输出

thread 0 thread 1 thread 2 ... thread 9

std::shared_lock

std::shared_lock 是 C++14 引入的用于共享所有权的锁管理器,适用于读多写少的场景。

特点

  • 共享模式:允许多个线程共享读取权限,但写操作必须独占。
  • 提升并发性:适用于读多写少的场景,提升并发性能。

例子

#include <iostream> #include <shared_mutex> #include <thread> #include <vector> std::shared_mutex shared_mtx; void reader(int id) { std::shared_lock<std::shared_mutex> lock(shared_mtx); std::cout << "Reader " << id << " is reading.\n"; std::this_thread::sleep_for(std::chrono::milliseconds(50)); } void writer(int id) { std::unique_lock<std::shared_mutex> lock(shared_mtx); std::cout << "Writer " << id << " is writing.\n"; std::this_thread::sleep_for(std::chrono::milliseconds(50)); } int main() { std::vector<std::thread> threads; for (int i = 0; i < 5; ++i) { threads.emplace_back(reader, i); } for (int i = 0; i < 2; ++i) { threads.emplace_back(writer, i); } for (auto& th : threads) { th.join(); } return 0; }

输出

Reader 0 is reading. Reader 1 is reading. Reader 2 is reading. Writer 0 is writing. Reader 3 is reading. Reader 4 is reading. Writer 1 is writing.

八、线程池

介绍

线程池是一种常用的多线程设计模式,它通过复用一组线程来执行任务,从而避免频繁创建和销毁线程的开销。线程池在需要频繁创建和销毁线程的场景中尤其有用,例如服务器处理请求、并行计算等。

实现

以下是一个简单的线程池实现:

#include <iostream> #include <vector> #include <thread> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <future> class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); }

使用线程池

int main() { ThreadPool pool(4); auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 1, 2); auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 3, 4); std::cout << "1 + 2 = " << result1.get() << std::endl; std::cout << "3 * 4 = " << result2.get() << std::endl; return 0; }

输出

1 + 2 = 3 3 * 4 = 12

优点

  • 复用线程:避免频繁创建销毁线程的开销。
  • 任务并行:提高任务处理的并行度。
  • 管理方便:统一管理线程和任务。

参考资料